This is an automated email from the ASF dual-hosted git repository.
wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new b81ef1940d BanyanDB: Move data write logic from BanyanDB Java Client
to OAP and support observe metrics for write operations. (#13541)
b81ef1940d is described below
commit b81ef1940dcf83c72994af7a24e22014d7f0927b
Author: Wan Kai <[email protected]>
AuthorDate: Tue Oct 14 13:45:38 2025 +0800
BanyanDB: Move data write logic from BanyanDB Java Client to OAP and
support observe metrics for write operations. (#13541)
---
docs/en/changes/changes.md | 1 +
oap-server-bom/pom.xml | 2 +-
.../storage/plugin/banyandb/BanyanDBBatchDAO.java | 8 +-
.../plugin/banyandb/BanyanDBStorageClient.java | 311 +++++++++++++++------
.../plugin/banyandb/BanyanDBStorageProvider.java | 2 +-
.../banyandb/bulk/AbstractBulkWriteProcessor.java | 208 ++++++++++++++
.../banyandb/bulk/MeasureBulkWriteProcessor.java | 119 ++++++++
.../banyandb/bulk/StreamBulkWriteProcessor.java | 122 ++++++++
.../banyandb/bulk/TraceBulkWriteProcessor.java | 121 ++++++++
.../server/storage/plugin/banyandb/BanyanDBIT.java | 74 +++--
.../src/test/resources/bydb-topn.yml | 40 +++
.../test/resources/bydb.dependencies.properties | 18 +-
.../src/test/resources/bydb.yml | 246 ++++++++++++++++
.../profiling/trace/profiling-cases-trace-v2.yaml | 2 +-
14 files changed, 1143 insertions(+), 131 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index a80fe30332..7c88abdbf7 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -105,6 +105,7 @@
* BanyanDB: support add group prefix (namespace) for BanyanDB groups.
* BanyanDB: fix when setting `@BanyanDB.TimestampColumn`, the column should
not be indexed.
* OAP Self Observability: make Trace analysis metrics separate by label
`protocol`, add Zipkin span dropped metrics.
+* BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support
observe metrics for write operations.
#### UI
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index f0a806718b..f990e755c6 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -72,7 +72,7 @@
<httpcore.version>4.4.16</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
- <banyandb-java-client.version>0.9.1</banyandb-java-client.version>
+ <banyandb-java-client.version>0.9.2</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index 6b95bfcb3d..eef7a78f66 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -21,14 +21,14 @@ package
org.apache.skywalking.oap.server.storage.plugin.banyandb;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
-import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
-import org.apache.skywalking.banyandb.v1.client.TraceBulkWriteProcessor;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.StreamBulkWriteProcessor;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.TraceBulkWriteProcessor;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
@@ -42,7 +42,7 @@ public class BanyanDBBatchDAO extends
AbstractDAO<BanyanDBStorageClient> impleme
private MeasureBulkWriteProcessor measureBulkWriteProcessor;
- private TraceBulkWriteProcessor traceBulkWriteProcessor;
+ private TraceBulkWriteProcessor traceBulkWriteProcessor;
private final int maxBulkSize;
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 3bb949e3cf..bb94b5906c 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -20,46 +20,58 @@ package
org.apache.skywalking.oap.server.storage.plugin.banyandb;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
-import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
+import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
-import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.banyandb.v1.client.Options;
-import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
+import org.apache.skywalking.banyandb.v1.client.PropertyStore;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TopNQuery;
import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TraceBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.TraceQuery;
import org.apache.skywalking.banyandb.v1.client.TraceQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TraceWrite;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import
org.apache.skywalking.banyandb.v1.client.grpc.exception.InternalException;
+import
org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidArgumentException;
+import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
import org.apache.skywalking.oap.server.library.client.Client;
import
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import org.apache.skywalking.oap.server.library.util.StringUtil;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.StreamBulkWriteProcessor;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.TraceBulkWriteProcessor;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
/**
* BanyanDBStorageClient is a simple wrapper for the underlying {@link
BanyanDBClient},
@@ -71,8 +83,17 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
final BanyanDBClient client;
private final DelegatedHealthChecker healthChecker = new
DelegatedHealthChecker();
private final int flushTimeout;
-
- public BanyanDBStorageClient(BanyanDBStorageConfig config) {
+ private final ModuleManager moduleManager;
+ private final Options options;
+ private BanyandbDatabase database;
+ private HistogramMetrics propertySingleWriteHistogram;
+ private HistogramMetrics propertyDeleteHistogram;
+ private HistogramMetrics streamSingleWriteHistogram;
+ private HistogramMetrics measureWriteHistogram;
+ private HistogramMetrics streamWriteHistogram;
+ private HistogramMetrics traceWriteHistogram;
+
+ public BanyanDBStorageClient(ModuleManager moduleManager,
BanyanDBStorageConfig config) {
Options options = new Options();
options.setSslTrustCAPath(config.getGlobal().getSslTrustCAPath());
String username = config.getGlobal().getUser();
@@ -88,10 +109,13 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
this.client = new BanyanDBClient(config.getTargetArray(), options);
this.flushTimeout = config.getGlobal().getFlushTimeout();
+ this.options = options;
+ this.moduleManager = moduleManager;
}
@Override
public void connect() throws Exception {
+ initTelemetry();
this.client.connect();
final Properties properties = new Properties();
try (final InputStream resourceAsStream
@@ -186,9 +210,10 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
public DeleteResponse deleteProperty(String name, String id) throws
IOException {
- try {
+ try (HistogramMetrics.Timer timer =
propertySingleWriteHistogram.createTimer()) {
MetadataRegistry.Schema schema =
MetadataRegistry.INSTANCE.findManagementMetadata(name);
- DeleteResponse result =
this.client.deleteProperty(schema.getMetadata().getGroup(), name, id);
+ PropertyStore store = new
PropertyStore(checkNotNull(client.getChannel()));
+ DeleteResponse result =
store.delete(schema.getMetadata().getGroup(), name, id);
this.healthChecker.health();
return result;
} catch (BanyanDBException ex) {
@@ -253,88 +278,37 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
/**
- * PropertyStore.Strategy is default to {@link Strategy#STRATEGY_MERGE}
+ * Apply(Create or update) the property with {@link
BanyandbProperty.ApplyRequest.Strategy#STRATEGY_MERGE}
+ *
+ * @param property the property to be stored in the BanyanBD
*/
- public void apply(Property property) throws IOException {
- try {
- this.client.apply(property);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw new IOException("fail to define property", ex);
- }
- }
-
- public void apply(Property property, Strategy strategy) throws IOException
{
- try {
- this.client.apply(property, strategy);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw new IOException("fail to define property", ex);
- }
- }
-
- public void define(Stream stream) throws BanyanDBException {
- try {
- this.client.define(stream);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw ex;
- }
- }
-
- public void define(Stream stream, List<BanyandbDatabase.IndexRule>
indexRules) throws BanyanDBException {
- try {
- this.client.define(stream, indexRules);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw ex;
- }
- }
-
- public void define(Measure measure) throws BanyanDBException {
- try {
- this.client.define(measure);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw ex;
- }
- }
-
- public void define(Measure measure, List<BanyandbDatabase.IndexRule>
indexRules) throws BanyanDBException {
- try {
- this.client.define(measure, indexRules);
- this.healthChecker.health();
- } catch (BanyanDBException ex) {
- healthChecker.unHealth(ex);
- throw ex;
- }
- }
-
- public void defineIfEmpty(Group group) throws IOException {
- try {
- try {
- this.client.define(group);
- } catch (AlreadyExistsException ignored) {
- }
+ public BanyandbProperty.ApplyResponse apply(Property property) throws
IOException {
+ try (HistogramMetrics.Timer timer =
propertySingleWriteHistogram.createTimer()) {
+ PropertyStore store = new
PropertyStore(checkNotNull(client.getChannel()));
+ BanyandbProperty.ApplyResponse response = store.apply(property);
this.healthChecker.health();
+ return response;
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
- throw new IOException("fail to define group", ex);
+ throw new IOException("fail to create property", ex);
}
}
- public void define(TopNAggregation topNAggregation) throws IOException {
- try {
- this.client.define(topNAggregation);
+ /**
+ * Apply(Create or update) the property
+ *
+ * @param property the property to be stored in the BanyanBD
+ * @param strategy dedicates how to apply the property
+ */
+ public BanyandbProperty.ApplyResponse apply(Property property, Strategy
strategy) throws IOException {
+ try (HistogramMetrics.Timer timer =
propertySingleWriteHistogram.createTimer()) {
+ PropertyStore store = new
PropertyStore(checkNotNull(client.getChannel()));
+ BanyandbProperty.ApplyResponse response = store.apply(property,
strategy);
this.healthChecker.health();
+ return response;
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
- throw new IOException("fail to define TopNAggregation", ex);
+ throw new IOException("fail to create property", ex);
}
}
@@ -362,24 +336,183 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
- public void write(StreamWrite streamWrite) {
- this.client.write(streamWrite);
+ /**
+ * Perform a single write with given entity.
+ *
+ * @param streamWrite the entity to be written
+ * @return a future of write result
+ */
+ public CompletableFuture<Void> write(StreamWrite streamWrite) {
+ checkState(client.getStreamServiceStub() != null, "stream service is
null");
+ HistogramMetrics.Timer timer =
streamSingleWriteHistogram.createTimer();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final StreamObserver<BanyandbStream.WriteRequest>
writeRequestStreamObserver
+ = client.getStreamServiceStub()
+ .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+ .write(
+ new StreamObserver<BanyandbStream.WriteResponse>() {
+ private BanyanDBException responseException;
+
+ @Override
+ public void onNext(BanyandbStream.WriteResponse
writeResponse) {
+ BanyandbModel.Status status =
StatusUtil.convertStringToStatus(
+ writeResponse.getStatus());
+ switch (status) {
+ case STATUS_SUCCEED:
+ break;
+ case STATUS_INVALID_TIMESTAMP:
+ responseException = new
InvalidArgumentException(
+ "Invalid timestamp: " +
streamWrite.getTimestamp(), null,
+ Status.Code.INVALID_ARGUMENT, false
+ );
+ break;
+ case STATUS_NOT_FOUND:
+ responseException = new
InvalidArgumentException(
+ "Invalid metadata: " +
streamWrite.getEntityMetadata(), null,
+ Status.Code.INVALID_ARGUMENT, false
+ );
+ break;
+ case STATUS_EXPIRED_SCHEMA:
+ BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
+ log.warn(
+ "The schema {}.{} is expired,
trying update the schema...",
+ metadata.getGroup(),
metadata.getName()
+ );
+ try {
+
client.updateStreamMetadataCacheFromSever(
+ metadata.getGroup(),
metadata.getName());
+ } catch (BanyanDBException e) {
+ String warnMessage = String.format(
+ "Failed to refresh the stream
schema %s.%s",
+ metadata.getGroup(),
metadata.getName()
+ );
+ log.warn(warnMessage, e);
+ }
+ responseException = new
InvalidArgumentException(
+ "Expired revision: " +
metadata.getModRevision(), null,
+ Status.Code.INVALID_ARGUMENT, true
+ );
+ break;
+ default:
+ responseException = new
InternalException(
+ String.format(
+ "Internal error (%s) occurs in
server", writeResponse.getStatus()),
+ null, Status.Code.INTERNAL, true
+ );
+ break;
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ timer.close();
+ log.error("Error occurs in flushing streams.",
throwable);
+ future.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ timer.close();
+ if (responseException == null) {
+ future.complete(null);
+ } else {
+
future.completeExceptionally(responseException);
+ }
+ }
+ });
+ try {
+ writeRequestStreamObserver.onNext(streamWrite.build());
+ } finally {
+ writeRequestStreamObserver.onCompleted();
+ }
+ return future;
}
+ /**
+ * Create a build process for stream write.
+ *
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second
+ * @param concurrency the number of concurrency would run for the flush
max
+ * @return stream bulk write processor
+ */
public StreamBulkWriteProcessor createStreamBulkProcessor(int maxBulkSize,
int flushInterval, int concurrency) {
- return this.client.buildStreamWriteProcessor(maxBulkSize,
flushInterval, concurrency, flushTimeout);
+ checkState(client.getStreamServiceStub() != null, "stream service is
null");
+ return new StreamBulkWriteProcessor(client, maxBulkSize,
flushInterval, concurrency, flushTimeout, streamWriteHistogram, options);
}
+ /**
+ * Create a build process for measure write.
+ *
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second
+ * @param concurrency the number of concurrency would run for the flush
max
+ * @return stream bulk write processor
+ */
public MeasureBulkWriteProcessor createMeasureBulkProcessor(int
maxBulkSize, int flushInterval, int concurrency) {
- return this.client.buildMeasureWriteProcessor(maxBulkSize,
flushInterval, concurrency, flushTimeout);
+ checkState(client.getMeasureServiceStub() != null, "measure service is
null");
+ return new MeasureBulkWriteProcessor(client, maxBulkSize,
flushInterval, concurrency, flushTimeout, measureWriteHistogram, options);
}
+ /**
+ * Build a trace bulk write processor.
+ *
+ * @param maxBulkSize the max size of each flush. The actual size is
determined by the length of byte array.
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush
max.
+ * @return trace bulk write processor
+ */
public TraceBulkWriteProcessor createTraceBulkProcessor(int maxBulkSize,
int flushInterval, int concurrency) {
- return this.client.buildTraceWriteProcessor(maxBulkSize,
flushInterval, concurrency, flushTimeout);
+ return new TraceBulkWriteProcessor(client, maxBulkSize, flushInterval,
concurrency, flushTimeout, traceWriteHistogram, options);
}
@Override
public void registerChecker(HealthChecker healthChecker) {
this.healthChecker.register(healthChecker);
}
+
+ private void initTelemetry() {
+ MetricsCreator metricsCreator =
moduleManager.find(TelemetryModule.NAME)
+ .provider()
+
.getService(MetricsCreator.class);
+ propertySingleWriteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("property", "single_write")
+ );
+ propertyDeleteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("property", "delete")
+ );
+ streamSingleWriteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("stream", "single_write")
+ );
+ measureWriteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("measure", "bulk_write")
+ );
+ streamWriteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("stream", "bulk_write")
+ );
+ traceWriteHistogram = metricsCreator.createHistogramMetric(
+ "banyandb_write_latency_seconds",
+ "BanyanDB Write latency in seconds",
+ new MetricsTag.Keys("catalog", "operation"),
+ new MetricsTag.Values("trace", "bulk_write")
+ );
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index db054a31b9..5322d9305a 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -148,7 +148,7 @@ public class BanyanDBStorageProvider extends ModuleProvider
{
}
this.registerServiceImplementation(StorageBuilderFactory.class, new
StorageBuilderFactory.Default());
- this.client = new BanyanDBStorageClient(config);
+ this.client = new BanyanDBStorageClient(getManager(), config);
this.modelInstaller = new BanyanDBIndexInstaller(client, getManager(),
this.config);
// Stream
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/AbstractBulkWriteProcessor.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/AbstractBulkWriteProcessor.java
new file mode 100644
index 0000000000..7c32fb3fa4
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/AbstractBulkWriteProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
+
+import io.grpc.stub.AbstractAsyncStub;
+import io.grpc.stub.StreamObserver;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.AbstractWrite;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+
+@Slf4j
+public abstract class AbstractBulkWriteProcessor<REQ extends
com.google.protobuf.GeneratedMessageV3,
+ STUB extends AbstractAsyncStub<STUB>>
+ implements Runnable, Closeable {
+ private final STUB stub;
+ private final int maxBulkSize;
+ private final int flushInterval;
+ private final ArrayBlockingQueue<Holder> requests;
+ private final Semaphore semaphore;
+ private final long flushInternalInMillis;
+ private final ScheduledThreadPoolExecutor scheduler;
+ private final int timeout;
+ private volatile long lastFlushTS = 0;
+
+ /**
+ * Create the processor.
+ *
+ * @param stub an implementation of {@link AbstractAsyncStub}
+ * @param processorName name of the processor for logging
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush
max.
+ * @param timeout network timeout threshold in seconds.
+ */
+ protected AbstractBulkWriteProcessor(STUB stub,
+ String processorName,
+ int maxBulkSize,
+ int flushInterval,
+ int concurrency,
+ int timeout) {
+ this.stub = stub;
+ this.maxBulkSize = maxBulkSize;
+ this.flushInterval = flushInterval;
+ this.timeout = timeout;
+ requests = new ArrayBlockingQueue<>(maxBulkSize + 1);
+ this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1);
+
+ scheduler = new ScheduledThreadPoolExecutor(1, r -> {
+ final Thread thread = new Thread(r);
+ thread.setName("BanyanDB BulkProcessor");
+ return thread;
+ });
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ scheduler.setRemoveOnCancelPolicy(true);
+ flushInternalInMillis = flushInterval * 1000;
+ scheduler.scheduleWithFixedDelay(
+ this, 0, flushInterval, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Add the measure to the bulk processor.
+ *
+ * @param writeEntity to add.
+ */
+ @SneakyThrows
+ public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) {
+ final CompletableFuture<Void> f = new CompletableFuture<>();
+ requests.put(Holder.create(writeEntity, f));
+ flushIfNeeded();
+ return f;
+ }
+
+ public void run() {
+ try {
+ doPeriodicalFlush();
+ } catch (Throwable t) {
+ log.error("Failed to flush data to BanyanDB", t);
+ }
+ }
+
+ @SneakyThrows
+ protected void flushIfNeeded() {
+ if (requests.size() >= maxBulkSize) {
+ flush();
+ }
+ }
+
+ private void doPeriodicalFlush() {
+ if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis /
2) {
+ // Run periodical flush if there is no `flushIfNeeded` executed in
the second half of the flush period.
+ // Otherwise, wait for the next round. By default, the last 2
seconds of the 5s period.
+ // This could avoid periodical flush running among
bulks(controlled by bulkActions).
+ flush();
+ }
+ }
+
+ public void flush() {
+ if (requests.isEmpty()) {
+ return;
+ }
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ log.error("Interrupted when trying to get semaphore to execute
bulk requests", e);
+ return;
+ }
+
+ final List<Holder> batch = new ArrayList<>(requests.size());
+ requests.drainTo(batch);
+ final CompletableFuture<Void> future = doObservedFlush(batch);
+ future.whenComplete((v, t) -> semaphore.release());
+ future.join();
+ lastFlushTS = System.currentTimeMillis();
+
+ }
+
+ protected abstract CompletableFuture<Void> doObservedFlush(final
List<Holder> data);
+
+ protected CompletableFuture<Void> doFlush(final List<Holder> data,
HistogramMetrics.Timer timer) {
+ // The batch is used to control the completion of the flush operation.
+ // There is at most one error per batch,
+ // because the database server would terminate the batch process when
the first error occurs.
+ final CompletableFuture<Void> batch = new CompletableFuture<>();
+ final StreamObserver<REQ> writeRequestStreamObserver
+ = this.buildStreamObserver(stub.withDeadlineAfter(timeout,
TimeUnit.SECONDS), batch);
+
+ try {
+ data.forEach(h -> {
+ AbstractWrite<REQ> entity = (AbstractWrite<REQ>)
h.getWriteEntity();
+ REQ request;
+ try {
+ request = entity.build();
+ } catch (Throwable bt) {
+ log.error("building the entity fails: {}",
entity.toString(), bt);
+ h.getFuture().completeExceptionally(bt);
+ return;
+ }
+ writeRequestStreamObserver.onNext(request);
+ h.getFuture().complete(null);
+ });
+ } finally {
+ writeRequestStreamObserver.onCompleted();
+ }
+ batch.whenComplete((ignored, exp) -> {
+ timer.close();
+ if (exp != null) {
+ log.error("Failed to execute requests in bulk", exp);
+ }
+ });
+ return batch;
+ }
+
+ public void close() {
+ scheduler.shutdownNow();
+ }
+
+ protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub,
CompletableFuture<Void> batch);
+
+ @Getter
+ static class Holder {
+ private final AbstractWrite<?> writeEntity;
+ private final CompletableFuture<Void> future;
+
+ private Holder(AbstractWrite<?> writeEntity, CompletableFuture<Void>
future) {
+ this.writeEntity = writeEntity;
+ this.future = future;
+ }
+
+ public static <REQ extends com.google.protobuf.GeneratedMessageV3>
Holder create(AbstractWrite<REQ> writeEntity,
+
CompletableFuture<Void> future) {
+ future.whenComplete((v, t) -> {
+ if (t != null) {
+ log.error("Failed to execute the request: {}",
writeEntity.toString(), t);
+ }
+ });
+ return new Holder(writeEntity, future);
+ }
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/MeasureBulkWriteProcessor.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/MeasureBulkWriteProcessor.java
new file mode 100644
index 0000000000..81303fac8a
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/MeasureBulkWriteProcessor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.Options;
+import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+
+/**
+ * MeasureBulkWriteProcessor works for measure flush.
+ */
+@Slf4j
+@ThreadSafe
+public class MeasureBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
+ MeasureServiceGrpc.MeasureServiceStub> {
+ private final BanyanDBClient client;
+ private final HistogramMetrics writeHistogram;
+ private final Options options;
+
+ /**
+ * Create the processor.
+ *
+ * @param client the client
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush
max.
+ * @param timeout network timeout threshold in seconds.
+ */
+ public MeasureBulkWriteProcessor(
+ final BanyanDBClient client,
+ final int maxBulkSize,
+ final int flushInterval,
+ final int concurrency,
+ final int timeout, final HistogramMetrics writeHistogram, final
Options options) {
+ super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
+ this.client = client;
+ this.writeHistogram = writeHistogram;
+ this.options = options;
+ }
+
+ @Override
+ protected StreamObserver<BanyandbMeasure.WriteRequest>
buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
+
CompletableFuture<Void> batch) {
+ return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
+ private final Set<String> schemaExpired = new HashSet<>();
+
+ @Override
+ public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
+ BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
+ switch (status) {
+ case STATUS_SUCCEED:
+ break;
+ case STATUS_EXPIRED_SCHEMA:
+ BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
+ String schemaKey = metadata.getGroup() + "." +
metadata.getName();
+ if (!schemaExpired.contains(schemaKey)) {
+ log.warn("The schema {} is expired, trying update
the schema...", schemaKey);
+ try {
+
client.updateMeasureMetadataCacheFromSever(metadata.getGroup(),
metadata.getName());
+ schemaExpired.add(schemaKey);
+ } catch (BanyanDBException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ break;
+ default:
+ log.warn("Write measure failed with status: {}",
status);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ batch.completeExceptionally(t);
+ log.error("Error occurs in flushing measures", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ batch.complete(null);
+ }
+ });
+ }
+
+ @Override
+ protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
+ HistogramMetrics.Timer timer = writeHistogram.createTimer();
+ return super.doFlush(data, timer);
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/StreamBulkWriteProcessor.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/StreamBulkWriteProcessor.java
new file mode 100644
index 0000000000..4fb42030bb
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/StreamBulkWriteProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
+import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.Options;
+import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+
+/**
+ * StreamBulkWriteProcessor works for stream flush.
+ */
+@Slf4j
+@ThreadSafe
+public class StreamBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
+ StreamServiceGrpc.StreamServiceStub> {
+ private final BanyanDBClient client;
+ private final HistogramMetrics writeHistogram;
+ private final Options options;
+
+ /**
+ * Create the processor.
+ *
+ * @param client the client
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param timeout network timeout threshold in seconds.
+ * @param concurrency the number of concurrency would run for the flush
max.
+ */
+ public StreamBulkWriteProcessor(
+ final BanyanDBClient client,
+ final int maxBulkSize,
+ final int flushInterval,
+ final int concurrency,
+ final int timeout,
+ final HistogramMetrics writeHistogram,
+ final Options options) {
+ super(client.getStreamServiceStub(), "StreamBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
+ this.client = client;
+ this.writeHistogram = writeHistogram;
+ this.options = options;
+ }
+
+ @Override
+ protected StreamObserver<BanyandbStream.WriteRequest>
buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub,
CompletableFuture<Void> batch) {
+ return stub.write(
+ new StreamObserver<BanyandbStream.WriteResponse>() {
+ private final Set<String> schemaExpired = new HashSet<>();
+
+ @Override
+ public void onNext(BanyandbStream.WriteResponse
writeResponse) {
+ BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
+ switch (status) {
+ case STATUS_SUCCEED:
+ break;
+ case STATUS_EXPIRED_SCHEMA:
+ BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
+ String schemaKey = metadata.getGroup() + "." +
metadata.getName();
+ if (!schemaExpired.contains(schemaKey)) {
+ log.warn("The schema {} is expired, trying
update the schema...", schemaKey);
+ try {
+
client.updateStreamMetadataCacheFromSever(metadata.getGroup(),
metadata.getName());
+ schemaExpired.add(schemaKey);
+ } catch (BanyanDBException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ break;
+ default:
+ log.warn("Write stream failed with status:
{}", status);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ batch.completeExceptionally(t);
+ log.error("Error occurs in flushing streams", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ batch.complete(null);
+ }
+ });
+ }
+
+ @Override
+ protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
+ HistogramMetrics.Timer timer = writeHistogram.createTimer();
+ return super.doFlush(data, timer);
+ }
+}
+
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/TraceBulkWriteProcessor.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000000..3e8914b9d2
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/bulk/TraceBulkWriteProcessor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
+import org.apache.skywalking.banyandb.trace.v1.TraceServiceGrpc;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.Options;
+import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+
+/**
+ * TraceBulkWriteProcessor works for trace flush.
+ */
+@Slf4j
+@ThreadSafe
+public class TraceBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbTrace.WriteRequest,
+ TraceServiceGrpc.TraceServiceStub> {
+ private final BanyanDBClient client;
+ private final HistogramMetrics writeHistogram;
+ private final Options options;
+
+ /**
+ * Create the processor.
+ *
+ * @param client the client
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param timeout network timeout threshold in seconds.
+ * @param concurrency the number of concurrency would run for the flush
max.
+ */
+ public TraceBulkWriteProcessor(
+ final BanyanDBClient client,
+ final int maxBulkSize,
+ final int flushInterval,
+ final int concurrency,
+ final int timeout,
+ final HistogramMetrics writeHistogram,
+ final Options options) {
+ super(client.getTraceServiceStub(), "TraceBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
+ this.client = client;
+ this.writeHistogram = writeHistogram;
+ this.options = options;
+ }
+
+ @Override
+ protected StreamObserver<BanyandbTrace.WriteRequest>
buildStreamObserver(TraceServiceGrpc.TraceServiceStub stub,
CompletableFuture<Void> batch) {
+ return stub.write(
+ new StreamObserver<BanyandbTrace.WriteResponse>() {
+ private final Set<String> schemaExpired = new HashSet<>();
+
+ @Override
+ public void onNext(BanyandbTrace.WriteResponse
writeResponse) {
+ BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
+ switch (status) {
+ case STATUS_SUCCEED:
+ break;
+ case STATUS_EXPIRED_SCHEMA:
+ BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
+ String schemaKey = metadata.getGroup() + "." +
metadata.getName();
+ if (!schemaExpired.contains(schemaKey)) {
+ log.warn("The trace schema {} is expired,
trying update the schema...", schemaKey);
+ try {
+
client.updateTraceMetadataCacheFromServer(metadata.getGroup(),
metadata.getName());
+ schemaExpired.add(schemaKey);
+ } catch (BanyanDBException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ break;
+ default:
+ log.warn("Write trace failed with status: {}",
status);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ batch.completeExceptionally(t);
+ log.error("Error occurs in flushing traces", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ batch.complete(null);
+ }
+ });
+ }
+
+ @Override
+ protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
+ HistogramMetrics.Timer timer = writeHistogram.createTimer();
+ return super.doFlush(data, timer);
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
index d47e265105..8eedfd693e 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
@@ -29,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
@@ -51,13 +50,21 @@ import
org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.it.ITVersions;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
@@ -98,10 +105,21 @@ public class BanyanDBIT {
private BanyanDBStorageConfig config;
protected void setUpConnection() throws Exception {
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleDefine storageModule = mock(ModuleDefine.class);
+ BanyanDBStorageProvider provider = mock(BanyanDBStorageProvider.class);
+ Mockito.when(provider.getModule()).thenReturn(storageModule);
+
+ NoneTelemetryProvider telemetryProvider =
mock(NoneTelemetryProvider.class);
+ Mockito.when(telemetryProvider.getService(MetricsCreator.class))
+ .thenReturn(new MetricsCreatorNoop());
+ TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
+ Whitebox.setInternalState(telemetryModule, "loadedProvider",
telemetryProvider);
+
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
log.info("create BanyanDB client and try to connect");
- config = new BanyanDBStorageConfig();
+ config = new BanyanDBConfigLoader(provider).loadConfig();
config.getGlobal().setTargets(banyanDB.getHost() + ":" +
banyanDB.getMappedPort(GRPC_PORT));
- client = new BanyanDBStorageClient(config);
+ client = new BanyanDBStorageClient(moduleManager, config);
client.connect();
}
@@ -112,7 +130,7 @@ public class BanyanDBIT {
DEFAULT_SCOPE_DEFINE_MOCKED_STATIC =
mockStatic(DefaultScopeDefine.class);
DEFAULT_SCOPE_DEFINE_MOCKED_STATIC.when(() ->
DefaultScopeDefine.nameOf(1)).thenReturn("any");
setUpConnection();
- processor = client.client.buildMeasureWriteProcessor(1000, 1, 1, 10);
+ processor = client.createMeasureBulkProcessor(1000, 1, 1);
}
@Test
@@ -132,7 +150,11 @@ public class BanyanDBIT {
BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
installer.isExists(model);
//test Group install
- BanyandbCommon.Group group =
client.client.findGroup(DownSampling.Minute.getName());
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
+ BanyanDB.MeasureGroup.METRICS_MINUTE.getName()
+ );
+ BanyandbCommon.Group group = client.client.findGroup(groupName);
assertEquals(BanyandbCommon.Catalog.CATALOG_MEASURE,
group.getCatalog());
assertEquals(config.getMetricsMin().getSegmentInterval(),
group.getResourceOpts().getSegmentInterval().getNum());
assertEquals(config.getMetricsMin().getShardNum(),
group.getResourceOpts().getShardNum());
@@ -142,7 +164,7 @@ public class BanyanDBIT {
installer.createTable(model);
//test Measure install
- BanyandbDatabase.Measure measure = client.client.findMeasure("minute",
"testMetric_minute");
+ BanyandbDatabase.Measure measure =
client.client.findMeasure(groupName, "testMetric_minute");
assertEquals("default", measure.getTagFamilies(0).getName());
assertEquals("tag", measure.getTagFamilies(0).getTags(0).getName());
assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
measure.getTagFamilies(0).getTags(0).getType());
@@ -154,25 +176,25 @@ public class BanyanDBIT {
assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT,
measure.getFields(0).getFieldType());
//test TopNAggregation install
BanyandbDatabase.TopNAggregation topNAggregation =
client.client.findTopNAggregation(
- "minute", "testMetric_minute_topn");
+ groupName, "testMetric-service");
assertEquals("value", topNAggregation.getFieldName());
assertEquals("service_id", topNAggregation.getGroupByTagNames(0));
- assertEquals(BanyandbModel.Sort.SORT_UNSPECIFIED,
topNAggregation.getFieldValueSort());
- assertEquals(2, topNAggregation.getLruSize());
+ assertEquals(BanyandbModel.Sort.SORT_DESC,
topNAggregation.getFieldValueSort());
+ assertEquals(10, topNAggregation.getLruSize());
assertEquals(1000, topNAggregation.getCountersNumber());
//test IndexRule install
- BanyandbDatabase.IndexRule indexRuleTag =
client.client.findIndexRule("minute", "tag");
+ BanyandbDatabase.IndexRule indexRuleTag =
client.client.findIndexRule(groupName, "tag");
assertEquals("url", indexRuleTag.getAnalyzer());
assertTrue(indexRuleTag.getNoSort());
//test IndexRuleBinding install
BanyandbDatabase.IndexRuleBinding indexRuleBinding =
client.client.findIndexRuleBinding(
- "minute", "testMetric_minute");
+ groupName, "testMetric_minute");
assertEquals("tag", indexRuleBinding.getRules(0));
assertEquals("testMetric_minute",
indexRuleBinding.getSubject().getName());
//test data query
Instant now = Instant.now();
Instant begin = now.minus(15, ChronoUnit.MINUTES);
- MeasureWrite measureWrite = client.createMeasureWrite("minute",
"testMetric_minute", now.toEpochMilli());
+ MeasureWrite measureWrite = client.createMeasureWrite(groupName,
"testMetric_minute", now.toEpochMilli());
measureWrite.tag("service_id", TagAndValue.stringTagValue("service1"))
.tag("tag", TagAndValue.stringTagValue("tag1"))
.field("value", TagAndValue.longFieldValue(100));
@@ -183,7 +205,7 @@ public class BanyanDBIT {
});
f.get(10, TimeUnit.SECONDS);
- MeasureQuery query = new MeasureQuery(Lists.newArrayList("minute"),
"testMetric_minute",
+ MeasureQuery query = new MeasureQuery(Lists.newArrayList(groupName),
"testMetric_minute",
new TimestampRange(
begin.toEpochMilli(),
now.plus(1,
ChronoUnit.MINUTES).toEpochMilli()
@@ -202,18 +224,18 @@ public class BanyanDBIT {
Model updatedModel = models.add(UpdateTestMetric.class,
DefaultScopeDefine.SERVICE,
new Storage("testMetric", true,
DownSampling.Minute)
);
-
config.getMetricsMin().setShardNum(config.getMetricsDay().getShardNum() + 1);
-
config.getMetricsMin().setSegmentInterval(config.getMetricsDay().getSegmentInterval()
+ 2);
- config.getMetricsMin().setTtl(config.getMetricsDay().getTtl() + 3);
+
config.getMetricsMin().setShardNum(config.getMetricsMin().getShardNum() + 1);
+
config.getMetricsMin().setSegmentInterval(config.getMetricsMin().getSegmentInterval()
+ 2);
+ config.getMetricsMin().setTtl(config.getMetricsMin().getTtl() + 3);
BanyanDBIndexInstaller newInstaller = new
BanyanDBIndexInstaller(client, moduleManager, config);
newInstaller.isExists(updatedModel);
//test Group update
- BanyandbCommon.Group updatedGroup =
client.client.findGroup(DownSampling.Minute.getName());
- assertEquals(updatedGroup.getResourceOpts().getShardNum(), 2);
+ BanyandbCommon.Group updatedGroup = client.client.findGroup(groupName);
+ assertEquals(updatedGroup.getResourceOpts().getShardNum(), 3);
assertEquals(updatedGroup.getResourceOpts().getSegmentInterval().getNum(), 3);
- assertEquals(updatedGroup.getResourceOpts().getTtl().getNum(), 33);
+ assertEquals(updatedGroup.getResourceOpts().getTtl().getNum(), 10);
//test Measure update
- BanyandbDatabase.Measure updatedMeasure =
client.client.findMeasure("minute", "testMetric_minute");
+ BanyandbDatabase.Measure updatedMeasure =
client.client.findMeasure(groupName, "testMetric_minute");
assertEquals("default", updatedMeasure.getTagFamilies(0).getName());
assertEquals("tag",
updatedMeasure.getTagFamilies(0).getTags(0).getName());
assertEquals("new_tag",
updatedMeasure.getTagFamilies(0).getTags(1).getName());
@@ -228,20 +250,20 @@ public class BanyanDBIT {
assertEquals("new_value", updatedMeasure.getFields(1).getName());
assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT,
updatedMeasure.getFields(1).getFieldType());
//test IndexRule update
- BanyandbDatabase.IndexRule updatedIndexRuleTag =
client.client.findIndexRule("minute", "tag");
+ BanyandbDatabase.IndexRule updatedIndexRuleTag =
client.client.findIndexRule(groupName, "tag");
assertEquals("", updatedIndexRuleTag.getAnalyzer());
assertFalse(updatedIndexRuleTag.getNoSort());
- BanyandbDatabase.IndexRule updatedIndexRuleNewTag =
client.client.findIndexRule("minute", "new_tag");
+ BanyandbDatabase.IndexRule updatedIndexRuleNewTag =
client.client.findIndexRule(groupName, "new_tag");
assertTrue(updatedIndexRuleNewTag.getNoSort());
//test IndexRuleBinding update
BanyandbDatabase.IndexRuleBinding updatedIndexRuleBinding =
client.client.findIndexRuleBinding(
- "minute", "testMetric_minute");
+ groupName, "testMetric_minute");
assertEquals("tag", updatedIndexRuleBinding.getRules(0));
assertEquals("new_tag", updatedIndexRuleBinding.getRules(1));
assertEquals("testMetric_minute",
updatedIndexRuleBinding.getSubject().getName());
//test data
- client.client.updateMeasureMetadataCacheFromSever("minute",
"testMetric_minute");
- MeasureWrite updatedMeasureWrite = client.createMeasureWrite("minute",
"testMetric_minute", now.plus(10, ChronoUnit.MINUTES).toEpochMilli());
+ client.client.updateMeasureMetadataCacheFromSever(groupName,
"testMetric_minute");
+ MeasureWrite updatedMeasureWrite =
client.createMeasureWrite(groupName, "testMetric_minute", now.plus(10,
ChronoUnit.MINUTES).toEpochMilli());
updatedMeasureWrite.tag("service_id",
TagAndValue.stringTagValue("service2"))
.tag("tag", TagAndValue.stringTagValue("tag1"))
.tag("new_tag",
TagAndValue.stringTagValue("new_tag1"))
@@ -253,7 +275,7 @@ public class BanyanDBIT {
return null;
});
cf.get(10, TimeUnit.SECONDS);
- MeasureQuery updatedQuery = new
MeasureQuery(Lists.newArrayList("minute"), "testMetric_minute",
+ MeasureQuery updatedQuery = new
MeasureQuery(Lists.newArrayList(groupName), "testMetric_minute",
new TimestampRange(
begin.toEpochMilli(),
now.plus(15,
ChronoUnit.MINUTES).toEpochMilli()
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb-topn.yml
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb-topn.yml
new file mode 100644
index 0000000000..982193d050
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb-topn.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to configure the TopN rules for BanyanDB in SkyWalking OAP
server.
+# The rules define how to aggregate and sort `metrics (Measure)` for services,
endpoints, and instances.
+#
+# - name: Required. The name of the TopN rule, uniquely identifies the rule.
+# - metricName: Required. The name of the metric to be aggregated.
+# - groupByTagNames: Optional, default `[]`. The tag names to group the
metrics by. If not specified, the metrics will sort without grouped.
+# - countersNumber: Optional, default `1000`. The max size of entries in a
time window for the pre-aggregation.
+
+# The size of LRU determines the maximally tolerated time range.
+# The buffers in the time range are kept in the memory so that
+# the data in [T - lruSize * n, T] would be accepted in the pre-aggregation
process.
+# T = the current time in the current dimensionality.
+# n = interval in the current dimensionality.
+# - lruSizeMinute: Optional, default `10`. Defines how many time_buckets are
held in the memory for minute-level metrics.
+# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are
held in the memory for hour and day-level metrics.
+
+# - sort: Optional, default `all`. The sorting order for the metrics, asc, des
or all(include both asc and des).
+# - excludes: Optional, default `[]`. The tag values to be excluded from the
candidates. If specified, the candidates will not include the entries with the
specified tag values.
+
+TopN-Rules:
+ - name: testMetric-service
+ metricName: testMetric
+ groupByTagNames:
+ - service_id
+ sort: des
\ No newline at end of file
diff --git a/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.dependencies.properties
similarity index 58%
copy from test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
copy to
oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.dependencies.properties
index 169e98b59e..d39c334c4f 100644
--- a/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.dependencies.properties
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# This file is used to show how to write configuration files and can be used
to test.
-
- cases:
- # trace segment list
- - query: |
- curl -s -XPOST
http://${provider_host}:${provider_9090}/profile/users?e2e=true -d
'{"enableProfiling":"false","name":"SkyWalking"}' -H "Content-Type:
application/json" > /dev/null;
- sleep 3;
- swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql tv2 ls
--service-name=e2e-service-provider
- expected: expected/traces-v2-list.yml
\ No newline at end of file
+# BanyanDB version is the version number of BanyanDB Server release.
+# This is the bundled and tested BanyanDB release version
+bydb.version=0.9
+# BanyanDB API version is the version number of the BanyanDB query APIs
+# OAP server has bundled implementation of BanyanDB Java client.
+# Please check BanyanDB documentation for the API version compatibility.
+#
https://skywalking.apache.org/docs/skywalking-banyandb/next/installation/versions
+# Each `bydb.api.version` could have multiple compatible release
version(`bydb.version`).
+bydb.api.version=0.9
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
new file mode 100644
index 0000000000..1260ffa34d
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+global:
+ # Targets is the list of BanyanDB servers, separated by commas.
+ # Each target is a BanyanDB server in the format of `host:port`.
+ # If BanyanDB is deployed as a standalone server, the target should be the
IP address or domain name and port of the BanyanDB server.
+ # If BanyanDB is deployed in a cluster, the targets should be the IP address
or domain name and port of the `liaison` nodes, separated by commas.
+ targets: ${SW_STORAGE_BANYANDB_TARGETS:127.0.0.1:17912}
+ # The maximum number of records in a bulk write request.
+ # A larger value can improve write performance but also increases OAP and
BanyanDB Server memory usage.
+ maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:10000}
+ # The minimum seconds between two bulk flushes.
+ # If the data in a bulk is less than maxBulkSize, the data will be flushed
after this period.
+ # If the data in a bulk exceeds maxBulkSize, the data will be flushed
immediately.
+ # A larger value can reduce write pressure on BanyanDB Server but increase
data latency.
+ flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
+ # The timeout in seconds for a bulk flush.
+ flushTimeout: ${SW_STORAGE_BANYANDB_FLUSH_TIMEOUT:10}
+ # The number of threads that write data to BanyanDB concurrently.
+ # A higher value can improve write performance but also increases CPU usage
on both OAP and BanyanDB Server.
+ concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
+ # The maximum size of the dataset when the OAP loads cache, such as network
aliases.
+ resultWindowMaxSize: ${SW_STORAGE_BANYANDB_QUERY_MAX_WINDOW_SIZE:10000}
+ # The maximum size of metadata per query.
+ metadataQueryMaxSize: ${SW_STORAGE_BANYANDB_QUERY_MAX_SIZE:10000}
+ # The maximum number of trace segments per query.
+ segmentQueryMaxSize: ${SW_STORAGE_BANYANDB_QUERY_SEGMENT_SIZE:200}
+ # The maximum number of profile task queries in a request.
+ profileTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_QUERY_PROFILE_TASK_SIZE:200}
+ # The batch size for querying profile data.
+ profileDataQueryBatchSize:
${SW_STORAGE_BANYANDB_QUERY_PROFILE_DATA_BATCH_SIZE:100}
+ asyncProfilerTaskQueryMaxSize:
${SW_STORAGE_BANYANDB_ASYNC_PROFILER_TASK_QUERY_MAX_SIZE:200}
+ user: ${SW_STORAGE_BANYANDB_USER:""}
+ password: ${SW_STORAGE_BANYANDB_PASSWORD:""}
+ # If the BanyanDB server is configured with TLS, configure the TLS cert file
path and enable TLS connection.
+ sslTrustCAPath: ${SW_STORAGE_BANYANDB_SSL_TRUST_CA_PATH:""}
+ # Cleanup TopN rules in BanyanDB server that are not configured in the
bydb-topn.yml config.
+ cleanupUnusedTopNRules: ${SW_STORAGE_BANYANDB_CLEANUP_UNUSED_TOPN_RULES:true}
+ # The namespace in BanyanDB to store the data of OAP, if not set, the
default is "sw".
+ # OAP will create BanyanDB Groups using the format of "{namespace}_{group
name}", such as "sw_records".
+ namespace: ${SW_NAMESPACE:"sw"}
+
+groups:
+ # The group settings of record.
+ # - "shardNum": Number of shards in the group. Shards are the basic units
of data storage in BanyanDB. Data is distributed across shards based on the
hash value of the series ID.
+ # Refer to the [BanyanDB
Shard](https://skywalking.apache.org/docs/skywalking-banyandb/latest/concept/clustering/#52-data-sharding)
documentation for more details.
+ # - "segmentInterval": Interval in days for creating a new segment.
Segments are time-based, allowing efficient data retention and querying. `SI`
stands for Segment Interval.
+ # - "ttl": Time-to-live for the data in the group, in days. Data exceeding
the TTL will be deleted.
+ # - "replicas": Number of replicas for the group/stage. Replicas are used
for data redundancy and high availability, a value of 0 means no replicas,
while a value of 1 means one primary shard and one replica, higher values
indicate more replicas.
+ #
+ # For more details on setting `segmentInterval` and `ttl`, refer to the
[BanyanDB TTL](https://skywalking.apache.org/docs/main/latest/en/banyandb/ttl)
documentation.
+
+ # The "records" section defines settings for normal datasets not specified
in records.
+ # Each dataset will be grouped under a single group named "records".
+ records:
+ # The settings for the default "hot" stage.
+ shardNum: ${SW_STORAGE_BANYANDB_RECORDS_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_RECORDS_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_RECORDS_TTL_DAYS:3}
+ replicas: ${SW_STORAGE_BANYANDB_RECORDS_REPLICAS:0}
+ # If the "warm" stage is enabled, the data will be moved to the "warm"
stage after the TTL of the "hot" stage.
+ # If the "cold" stage is enabled and "warm" stage is disabled, the data
will be moved to the "cold" stage after the TTL of the "hot" stage.
+ # If both "warm" and "cold" stages are enabled, the data will be moved to
the "warm" stage after the TTL of the "hot" stage, and then to the "cold" stage
after the TTL of the "warm" stage.
+ # OAP will query the data from the "hot and warm" stage by default if the
"warm" stage is enabled.
+ enableWarmStage: ${SW_STORAGE_BANYANDB_RECORDS_ENABLE_WARM_STAGE:false}
+ enableColdStage: ${SW_STORAGE_BANYANDB_RECORDS_ENABLE_COLD_STAGE:false}
+ # The settings for the "warm" stage.
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_RECORDS_WARM_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_RECORDS_WARM_SI_DAYS:2}
+ ttl: ${SW_STORAGE_BANYANDB_RECORDS_WARM_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_RECORDS_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_RECORDS_WARM_NODE_SELECTOR:"type=warm"}
+ # The settings for the "cold" stage.
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_RECORDS_COLD_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_RECORDS_COLD_SI_DAYS:3}
+ ttl: ${SW_STORAGE_BANYANDB_RECORDS_COLD_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_RECORDS_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_RECORDS_COLD_NODE_SELECTOR:"type=cold"}
+ trace:
+ shardNum: ${SW_STORAGE_BANYANDB_TRACE_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_TRACE_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_TRACE_TTL_DAYS:3}
+ replicas: ${SW_STORAGE_BANYANDB_TRACE_REPLICAS:0}
+ enableWarmStage: ${SW_STORAGE_BANYANDB_TRACE_ENABLE_WARM_STAGE:false}
+ enableColdStage: ${SW_STORAGE_BANYANDB_TRACE_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_TRACE_WARM_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_TRACE_WARM_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_TRACE_WARM_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_TRACE_WARM_REPLICAS:0}
+ nodeSelector: ${SW_STORAGE_BANYANDB_TRACE_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_TRACE_COLD_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_TRACE_COLD_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_TRACE_COLD_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_TRACE_COLD_REPLICAS:0}
+ nodeSelector: ${SW_STORAGE_BANYANDB_TRACE_COLD_NODE_SELECTOR:"type=cold"}
+ zipkinTrace:
+ shardNum: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_TTL_DAYS:3}
+ replicas: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_REPLICAS:0}
+ enableWarmStage:
${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_ENABLE_WARM_STAGE:false}
+ enableColdStage:
${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_WARM_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_WARM_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_WARM_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_COLD_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_COLD_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_COLD_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_ZIPKIN_TRACE_COLD_NODE_SELECTOR:"type=cold"}
+ recordsLog:
+ shardNum: ${SW_STORAGE_BANYANDB_LOG_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_LOG_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_LOG_TTL_DAYS:3}
+ replicas: ${SW_STORAGE_BANYANDB_LOG_REPLICAS:0}
+ enableWarmStage: ${SW_STORAGE_BANYANDB_LOG_ENABLE_WARM_STAGE:false}
+ enableColdStage: ${SW_STORAGE_BANYANDB_LOG_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_LOG_WARM_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_LOG_WARM_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_LOG_WARM_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_LOG_WARM_REPLICAS:0}
+ nodeSelector: ${SW_STORAGE_BANYANDB_LOG_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_LOG_COLD_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_LOG_COLD_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_LOG_COLD_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_LOG_COLD_REPLICAS:0}
+ nodeSelector: ${SW_STORAGE_BANYANDB_LOG_COLD_NODE_SELECTOR:"type=cold"}
+ recordsBrowserErrorLog:
+ shardNum: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_TTL_DAYS:3}
+ replicas: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_REPLICAS:0}
+ enableWarmStage:
${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_ENABLE_WARM_STAGE:false}
+ enableColdStage:
${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_WARM_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_WARM_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_WARM_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_COLD_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_COLD_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_COLD_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_BROWSER_ERROR_LOG_COLD_NODE_SELECTOR:"type=cold"}
+ # The group settings of metrics.
+ #
+ # OAP stores metrics based its granularity.
+ # Valid values are "day", "hour", and "minute". That means metrics will be
stored in the three separate groups.
+ # Non-"minute" are governed by the "core.downsampling" setting.
+ # For example, if "core.downsampling" is set to "hour", the "hour" will be
used, while "day" are ignored.
+ metricsMinute:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_SI_DAYS:1}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_TTL_DAYS:7}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_REPLICAS:0}
+ enableWarmStage:
${SW_STORAGE_BANYANDB_METRICS_MINUTE_ENABLE_WARM_STAGE:false}
+ enableColdStage:
${SW_STORAGE_BANYANDB_METRICS_MINUTE_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_WARM_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_WARM_SI_DAYS:3}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_WARM_TTL_DAYS:15}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_MINUTE_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_COLD_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_COLD_SI_DAYS:5}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_COLD_TTL_DAYS:60}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_MINUTE_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_MINUTE_COLD_NODE_SELECTOR:"type=cold"}
+ metricsHour:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_HOUR_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_HOUR_SI_DAYS:5}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_HOUR_TTL_DAYS:15}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_HOUR_REPLICAS:0}
+ enableWarmStage:
${SW_STORAGE_BANYANDB_METRICS_HOUR_ENABLE_WARM_STAGE:false}
+ enableColdStage:
${SW_STORAGE_BANYANDB_METRICS_HOUR_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_HOUR_WARM_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_HOUR_WARM_SI_DAYS:7}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_HOUR_WARM_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_HOUR_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_HOUR_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_HOUR_COLD_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_HOUR_COLD_SI_DAYS:15}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_HOUR_COLD_TTL_DAYS:120}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_HOUR_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_HOUR_COLD_NODE_SELECTOR:"type=cold"}
+ metricsDay:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_DAY_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_DAY_SI_DAYS:15}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_DAY_TTL_DAYS:15}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_DAY_REPLICAS:0}
+ enableWarmStage: ${SW_STORAGE_BANYANDB_METRICS_DAY_ENABLE_WARM_STAGE:false}
+ enableColdStage: ${SW_STORAGE_BANYANDB_METRICS_DAY_ENABLE_COLD_STAGE:false}
+ warm:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_DAY_WARM_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_DAY_WARM_SI_DAYS:15}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_DAY_WARM_TTL_DAYS:30}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_DAY_WARM_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_DAY_WARM_NODE_SELECTOR:"type=warm"}
+ cold:
+ shardNum: ${SW_STORAGE_BANYANDB_METRICS_DAY_COLD_SHARD_NUM:1}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METRICS_DAY_COLD_SI_DAYS:15}
+ ttl: ${SW_STORAGE_BANYANDB_METRICS_DAY_COLD_TTL_DAYS:120}
+ replicas: ${SW_STORAGE_BANYANDB_METRICS_DAY_COLD_REPLICAS:0}
+ nodeSelector:
${SW_STORAGE_BANYANDB_METRICS_DAY_COLD_NODE_SELECTOR:"type=cold"}
+ # If the metrics is marked as "index_mode", the metrics will be stored in
the "metadata" group.
+ # The "metadata" group is designed to store metrics that are used for
indexing without value columns.
+ # Such as `service_traffic`, `network_address_alias`, etc.
+ # "index_mode" requires BanyanDB *0.8.0* or later.
+ metadata:
+ shardNum: ${SW_STORAGE_BANYANDB_METADATA_SHARD_NUM:2}
+ segmentInterval: ${SW_STORAGE_BANYANDB_METADATA_SI_DAYS:15}
+ ttl: ${SW_STORAGE_BANYANDB_METADATA_TTL_DAYS:15}
+ replicas: ${SW_STORAGE_BANYANDB_METADATA_REPLICAS:0}
+
+ # The group settings of property, such as UI and profiling.
+ property:
+ shardNum: ${SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM:1}
+ replicas: ${SW_STORAGE_BANYANDB_PROPERTY_REPLICAS:0}
diff --git a/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
b/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
index 169e98b59e..0850f61fc6 100644
--- a/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
+++ b/test/e2e-v2/cases/profiling/trace/profiling-cases-trace-v2.yaml
@@ -16,7 +16,7 @@
# This file is used to show how to write configuration files and can be used
to test.
cases:
- # trace segment list
+ # trace list
- query: |
curl -s -XPOST
http://${provider_host}:${provider_9090}/profile/users?e2e=true -d
'{"enableProfiling":"false","name":"SkyWalking"}' -H "Content-Type:
application/json" > /dev/null;
sleep 3;