This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push:
new 35eb8b8 Move write logic to OAP Server. (#101)
35eb8b8 is described below
commit 35eb8b8a48f8c5961a9754daa37d1037e8c024bf
Author: Wan Kai <[email protected]>
AuthorDate: Mon Oct 13 16:35:46 2025 +0800
Move write logic to OAP Server. (#101)
---
.../v1/client/AbstractBulkWriteProcessor.java | 204 -------------------
.../banyandb/v1/client/AbstractWrite.java | 3 +-
.../banyandb/v1/client/BanyanDBClient.java | 225 +--------------------
.../v1/client/MeasureBulkWriteProcessor.java | 124 ------------
.../v1/client/StreamBulkWriteProcessor.java | 128 ------------
.../v1/client/TraceBulkWriteProcessor.java | 127 ------------
.../v1/client/ITBanyanDBMeasureQueryTests.java | 35 +++-
.../v1/client/ITBanyanDBPropertyTests.java | 25 ++-
.../v1/client/ITBanyanDBStreamQueryTests.java | 35 ++--
.../skywalking/banyandb/v1/client/ITTraceTest.java | 71 +++++--
10 files changed, 121 insertions(+), 856 deletions(-)
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
deleted file mode 100644
index d850cd1..0000000
---
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import com.google.auto.value.AutoValue;
-import io.grpc.stub.AbstractAsyncStub;
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public abstract class AbstractBulkWriteProcessor<REQ extends
com.google.protobuf.GeneratedMessageV3,
- STUB extends AbstractAsyncStub<STUB>>
- implements Runnable, Closeable {
- private final STUB stub;
- private final int maxBulkSize;
- private final int flushInterval;
- private final ArrayBlockingQueue<Holder> requests;
- private final Semaphore semaphore;
- private final long flushInternalInMillis;
- private final ScheduledThreadPoolExecutor scheduler;
- private final int timeout;
- private volatile long lastFlushTS = 0;
-
- /**
- * Create the processor.
- *
- * @param stub an implementation of {@link AbstractAsyncStub}
- * @param processorName name of the processor for logging
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second.
- * @param concurrency the number of concurrency would run for the flush
max.
- * @param timeout network timeout threshold in seconds.
- */
- protected AbstractBulkWriteProcessor(STUB stub,
- String processorName,
- int maxBulkSize,
- int flushInterval,
- int concurrency,
- int timeout) {
- this.stub = stub;
- this.maxBulkSize = maxBulkSize;
- this.flushInterval = flushInterval;
- this.timeout = timeout;
- requests = new ArrayBlockingQueue<>(maxBulkSize + 1);
- this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1);
-
- scheduler = new ScheduledThreadPoolExecutor(1, r -> {
- final Thread thread = new Thread(r);
- thread.setName("BanyanDB BulkProcessor");
- return thread;
- });
- scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- scheduler.setRemoveOnCancelPolicy(true);
- flushInternalInMillis = flushInterval * 1000;
- scheduler.scheduleWithFixedDelay(
- this, 0, flushInterval, TimeUnit.SECONDS);
- }
-
- /**
- * Add the measure to the bulk processor.
- *
- * @param writeEntity to add.
- */
- @SneakyThrows
- public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) {
- final CompletableFuture<Void> f = new CompletableFuture<>();
- requests.put(Holder.create(writeEntity, f));
- flushIfNeeded();
- return f;
- }
-
- public void run() {
- try {
- doPeriodicalFlush();
- } catch (Throwable t) {
- log.error("Failed to flush data to BanyanDB", t);
- }
- }
-
- @SneakyThrows
- protected void flushIfNeeded() {
- if (requests.size() >= maxBulkSize) {
- flush();
- }
- }
-
- private void doPeriodicalFlush() {
- if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis /
2) {
- // Run periodical flush if there is no `flushIfNeeded` executed in
the second half of the flush period.
- // Otherwise, wait for the next round. By default, the last 2
seconds of the 5s period.
- // This could avoid periodical flush running among
bulks(controlled by bulkActions).
- flush();
- }
- }
-
- public void flush() {
- if (requests.isEmpty()) {
- return;
- }
-
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- log.error("Interrupted when trying to get semaphore to execute
bulk requests", e);
- return;
- }
-
- final List<Holder> batch = new ArrayList<>(requests.size());
- requests.drainTo(batch);
- final CompletableFuture<Void> future = doObservedFlush(batch);
- future.whenComplete((v, t) -> semaphore.release());
- future.join();
- lastFlushTS = System.currentTimeMillis();
-
- }
-
- protected abstract CompletableFuture<Void> doObservedFlush(final
List<Holder> data);
-
- protected CompletableFuture<Void> doFlush(final List<Holder> data,
Histogram.Timer timer) {
- // The batch is used to control the completion of the flush operation.
- // There is at most one error per batch,
- // because the database server would terminate the batch process when
the first error occurs.
- final CompletableFuture<Void> batch = new CompletableFuture<>();
- final StreamObserver<REQ> writeRequestStreamObserver
- = this.buildStreamObserver(stub.withDeadlineAfter(timeout,
TimeUnit.SECONDS), batch);
-
- try {
- data.forEach(h -> {
- AbstractWrite<REQ> entity = (AbstractWrite<REQ>)
h.writeEntity();
- REQ request;
- try {
- request = entity.build();
- } catch (Throwable bt) {
- log.error("building the entity fails: {}",
entity.toString(), bt);
- h.future().completeExceptionally(bt);
- return;
- }
- writeRequestStreamObserver.onNext(request);
- h.future().complete(null);
- });
- } finally {
- writeRequestStreamObserver.onCompleted();
- }
- batch.whenComplete((ignored, exp) -> {
- timer.observeDuration();
- if (exp != null) {
- log.error("Failed to execute requests in bulk", exp);
- }
- });
- return batch;
- }
-
- public void close() {
- scheduler.shutdownNow();
- }
-
- protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub,
CompletableFuture<Void> batch);
-
- @AutoValue
- static abstract class Holder {
- abstract AbstractWrite writeEntity();
-
- abstract CompletableFuture<Void> future();
-
- public static <REQ extends com.google.protobuf.GeneratedMessageV3>
Holder create(AbstractWrite<REQ> writeEntity,
-
CompletableFuture<Void> future) {
- future.whenComplete((v, t) -> {
- if (t != null) {
- log.error("Failed to execute the request: {}",
writeEntity.toString(), t);
- }
- });
- return new
AutoValue_AbstractBulkWriteProcessor_Holder(writeEntity, future);
- }
-
- }
-}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
index bae244d..0f29c45 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
@@ -44,6 +44,7 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
protected final Object[] tags;
+ @Getter
protected final MetadataCache.EntityMetadata entityMetadata;
public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long
timestamp) {
@@ -71,7 +72,7 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
return this;
}
- P build() {
+ public P build() {
BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build();
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index bcae829..d9505c1 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -26,11 +26,8 @@ import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.Status;
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -44,12 +41,7 @@ import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Subject;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Trace;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
-import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
-import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
-import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyResponse;
-import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
@@ -61,8 +53,6 @@ import
org.apache.skywalking.banyandb.v1.client.grpc.HandleExceptionsWith;
import org.apache.skywalking.banyandb.v1.client.grpc.channel.ChannelManager;
import
org.apache.skywalking.banyandb.v1.client.grpc.channel.DefaultChannelFactory;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.InternalException;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidArgumentException;
import org.apache.skywalking.banyandb.v1.client.metadata.GroupMetadataRegistry;
import
org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBindingMetadataRegistry;
import
org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleMetadataRegistry;
@@ -81,12 +71,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -106,47 +94,46 @@ import static
com.google.common.base.Preconditions.checkState;
@Slf4j
public class BanyanDBClient implements Closeable {
public static final ZonedDateTime DEFAULT_EXPIRE_AT =
ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
- private static Histogram WRITE_HISTOGRAM;
private final String[] targets;
/**
* Options for server connection.
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private final Options options;
/**
* gRPC connection.
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private volatile Channel channel;
/**
* gRPC client stub
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private StreamServiceGrpc.StreamServiceStub streamServiceStub;
/**
* gRPC client stub
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private MeasureServiceGrpc.MeasureServiceStub measureServiceStub;
/**
* gRPC client stub
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private TraceServiceGrpc.TraceServiceStub traceServiceStub;
/**
* gRPC future stub.
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private StreamServiceGrpc.StreamServiceBlockingStub
streamServiceBlockingStub;
/**
* gRPC future stub.
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private MeasureServiceGrpc.MeasureServiceBlockingStub
measureServiceBlockingStub;
/**
* gRPC future stub.
*/
- @Getter(value = AccessLevel.PACKAGE)
+ @Getter
private TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
/**
* The connection status.
@@ -162,16 +149,6 @@ public class BanyanDBClient implements Closeable {
*/
private final MetadataCache metadataCache;
- static {
- // init prometheus metric
- WRITE_HISTOGRAM = Histogram.build()
- .name("banyandb_write_latency_seconds")
- .help("BanyanDB Bulk Write latency in
seconds")
- .buckets(0.005, 0.01, 0.025, 0.05, 0.1,
0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
- .labelNames("catalog", "operation",
"instanceID")
- .register();
- }
-
/**
* Create a BanyanDB client instance with a default options.
*
@@ -255,122 +232,6 @@ public class BanyanDBClient implements Closeable {
}
}
- /**
- * Perform a single write with given entity.
- *
- * @param streamWrite the entity to be written
- * @return a future of write result
- */
- public CompletableFuture<Void> write(StreamWrite streamWrite) {
- checkState(this.streamServiceStub != null, "stream service is null");
-
- Histogram.Timer timer
- = WRITE_HISTOGRAM.labels(
- "stream",
- "single_write", // single write for non-bulk
operation.
-
options.getPrometheusMetricsOpts().getClientID()
- )
- .startTimer();
- CompletableFuture<Void> future = new CompletableFuture<>();
- final StreamObserver<BanyandbStream.WriteRequest>
writeRequestStreamObserver
- = this.streamServiceStub
- .withDeadlineAfter(this.getOptions().getDeadline(),
TimeUnit.SECONDS)
- .write(
- new StreamObserver<BanyandbStream.WriteResponse>() {
- private BanyanDBException responseException;
-
- @Override
- public void onNext(BanyandbStream.WriteResponse
writeResponse) {
- BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
- switch (status) {
- case STATUS_SUCCEED:
- break;
- case STATUS_INVALID_TIMESTAMP:
- responseException = new
InvalidArgumentException(
- "Invalid timestamp: " +
streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false);
- break;
- case STATUS_NOT_FOUND:
- responseException = new
InvalidArgumentException(
- "Invalid metadata: " +
streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false);
- break;
- case STATUS_EXPIRED_SCHEMA:
- BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
- log.warn("The schema {}.{} is expired,
trying update the schema...",
- metadata.getGroup(),
metadata.getName());
- try {
-
BanyanDBClient.this.updateStreamMetadataCacheFromSever(metadata.getGroup(),
metadata.getName());
- } catch (BanyanDBException e) {
- String warnMessage =
String.format("Failed to refresh the stream schema %s.%s",
- metadata.getGroup(),
metadata.getName());
- log.warn(warnMessage, e);
- }
- responseException = new
InvalidArgumentException(
- "Expired revision: " +
metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true);
- break;
- default:
- responseException = new
InternalException(
- String.format("Internal error
(%s) occurs in server", writeResponse.getStatus()), null, Status.Code.INTERNAL,
true);
- break;
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- timer.observeDuration();
- log.error("Error occurs in flushing streams.",
throwable);
- future.completeExceptionally(throwable);
- }
-
- @Override
- public void onCompleted() {
- timer.observeDuration();
- if (responseException == null) {
- future.complete(null);
- } else {
-
future.completeExceptionally(responseException);
- }
- }
- });
- try {
- writeRequestStreamObserver.onNext(streamWrite.build());
- } finally {
- writeRequestStreamObserver.onCompleted();
- }
- return future;
- }
-
- /**
- * Create a build process for stream write.
- *
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second
- * @param concurrency the number of concurrency would run for the flush
max
- * @param timeout network timeout threshold in seconds.
- * @return stream bulk write processor
- */
- public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize,
int flushInterval, int concurrency, int timeout) {
- checkState(this.streamServiceStub != null, "stream service is null");
-
- return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval,
concurrency, timeout, WRITE_HISTOGRAM, options);
- }
-
- /**
- * Create a build process for measure write.
- *
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second
- * @param concurrency the number of concurrency would run for the flush
max
- * @param timeout network timeout threshold in seconds.
- * @return stream bulk write processor
- */
- public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int
maxBulkSize, int flushInterval, int concurrency, int timeout) {
- checkState(this.measureServiceStub != null, "measure service is null");
-
- return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval,
concurrency, timeout, WRITE_HISTOGRAM, options);
- }
-
/**
* Build a MeasureWrite request.
*
@@ -399,22 +260,6 @@ public class BanyanDBClient implements Closeable {
return new StreamWrite(this.metadataCache.findStreamMetadata(group,
name), elementId);
}
- /**
- * Build a trace bulk write processor.
- *
- * @param maxBulkSize the max size of each flush. The actual size is
determined by the length of byte array.
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second.
- * @param concurrency the number of concurrency would run for the flush
max.
- * @param timeout network timeout threshold in seconds.
- * @return trace bulk write processor
- */
- public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize,
int flushInterval, int concurrency, int timeout) {
- checkState(this.traceServiceStub != null, "trace service is null");
-
- return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval,
concurrency, timeout, WRITE_HISTOGRAM, options);
- }
-
/**
* Build a TraceWrite request without initial timestamp.
*
@@ -958,40 +803,6 @@ public class BanyanDBClient implements Closeable {
return registry.delete(group, name);
}
- /**
- * Apply(Create or update) the property with {@link
BanyandbProperty.ApplyRequest.Strategy#STRATEGY_MERGE}
- *
- * @param property the property to be stored in the BanyanBD
- */
- public ApplyResponse apply(Property property) throws BanyanDBException {
- PropertyStore store = new PropertyStore(checkNotNull(this.channel));
- try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
- "property",
- "single_write",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer()) {
- return store.apply(property);
- }
- }
-
- /**
- * Apply(Create or update) the property
- *
- * @param property the property to be stored in the BanyanBD
- * @param strategy dedicates how to apply the property
- */
- public ApplyResponse apply(Property property, Strategy strategy) throws
- BanyanDBException {
- PropertyStore store = new PropertyStore(checkNotNull(this.channel));
- try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
- "property",
- "single_write",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer()) {
- return store.apply(property, strategy);
- }
- }
-
/**
* Query properties
*
@@ -1003,26 +814,6 @@ public class BanyanDBClient implements Closeable {
return store.query(request);
}
- /**
- * Delete property
- *
- * @param group group of the metadata
- * @param name name of the metadata
- * @param id identity of the property
- * @return if this property has been deleted
- */
- public DeleteResponse deleteProperty(String group, String name, String id)
throws
- BanyanDBException {
- PropertyStore store = new PropertyStore(checkNotNull(this.channel));
- try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
- "property",
- "delete",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer()) {
- return store.delete(group, name, id);
- }
- }
-
/**
* Define a new trace
*
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
deleted file mode 100644
index ce17e66..0000000
---
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
-import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * MeasureBulkWriteProcessor works for measure flush.
- */
-@Slf4j
-@ThreadSafe
-public class MeasureBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
- MeasureServiceGrpc.MeasureServiceStub> {
- private final BanyanDBClient client;
- private final Histogram writeHistogram;
- private final Options options;
-
- /**
- * Create the processor.
- *
- * @param client the client
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second.
- * @param concurrency the number of concurrency would run for the flush
max.
- * @param timeout network timeout threshold in seconds.
- */
- protected MeasureBulkWriteProcessor(
- final BanyanDBClient client,
- final int maxBulkSize,
- final int flushInterval,
- final int concurrency,
- final int timeout, final Histogram writeHistogram, final Options
options) {
- super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
- this.client = client;
- this.writeHistogram = writeHistogram;
- this.options = options;
- }
-
- @Override
- protected StreamObserver<BanyandbMeasure.WriteRequest>
buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
-
CompletableFuture<Void> batch) {
- return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
- private final Set<String> schemaExpired = new HashSet<>();
-
- @Override
- public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
- BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
- switch (status) {
- case STATUS_SUCCEED:
- break;
- case STATUS_EXPIRED_SCHEMA:
- BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
- String schemaKey = metadata.getGroup() + "." +
metadata.getName();
- if (!schemaExpired.contains(schemaKey)) {
- log.warn("The schema {} is expired, trying update
the schema...", schemaKey);
- try {
-
client.updateMeasureMetadataCacheFromSever(metadata.getGroup(),
metadata.getName());
- schemaExpired.add(schemaKey);
- } catch (BanyanDBException e) {
- log.error(e.getMessage(), e);
- }
- }
- break;
- default:
- log.warn("Write measure failed with status: {}",
status);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- batch.completeExceptionally(t);
- log.error("Error occurs in flushing measures", t);
- }
-
- @Override
- public void onCompleted() {
- batch.complete(null);
- }
- });
- }
-
- @Override
- protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
- Histogram.Timer timer = writeHistogram.labels(
- "measure",
- "bulk_write",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer();
- return super.doFlush(data, timer);
- }
-}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
deleted file mode 100644
index 01f042a..0000000
---
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
-import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * StreamBulkWriteProcessor works for stream flush.
- */
-@Slf4j
-@ThreadSafe
-public class StreamBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
- StreamServiceGrpc.StreamServiceStub> {
- private final BanyanDBClient client;
- private final Histogram writeHistogram;
- private final Options options;
-
- /**
- * Create the processor.
- *
- * @param client the client
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second.
- * @param timeout network timeout threshold in seconds.
- * @param concurrency the number of concurrency would run for the flush
max.
- */
- protected StreamBulkWriteProcessor(
- final BanyanDBClient client,
- final int maxBulkSize,
- final int flushInterval,
- final int concurrency,
- final int timeout,
- final Histogram writeHistogram,
- final Options options) {
- super(client.getStreamServiceStub(), "StreamBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
- this.client = client;
- this.writeHistogram = writeHistogram;
- this.options = options;
- }
-
- @Override
- protected StreamObserver<BanyandbStream.WriteRequest>
buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub,
CompletableFuture<Void> batch) {
- return stub.write(
- new StreamObserver<BanyandbStream.WriteResponse>() {
- private final Set<String> schemaExpired = new HashSet<>();
-
- @Override
- public void onNext(BanyandbStream.WriteResponse
writeResponse) {
- BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
- switch (status) {
- case STATUS_SUCCEED:
- break;
- case STATUS_EXPIRED_SCHEMA:
- BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
- String schemaKey = metadata.getGroup() + "." +
metadata.getName();
- if (!schemaExpired.contains(schemaKey)) {
- log.warn("The schema {} is expired, trying
update the schema...", schemaKey);
- try {
-
client.updateStreamMetadataCacheFromSever(metadata.getGroup(),
metadata.getName());
- schemaExpired.add(schemaKey);
- } catch (BanyanDBException e) {
- log.error(e.getMessage(), e);
- }
- }
- break;
- default:
- log.warn("Write stream failed with status:
{}", status);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- batch.completeExceptionally(t);
- log.error("Error occurs in flushing streams", t);
- }
-
- @Override
- public void onCompleted() {
- batch.complete(null);
- }
- });
- }
-
- @Override
- protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
- Histogram.Timer timer = writeHistogram.labels(
- "stream",
- "bulk_write",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer();
- return super.doFlush(data, timer);
- }
-}
-
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
deleted file mode 100644
index 96fdc4d..0000000
---
a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.trace.v1.TraceServiceGrpc;
-import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
-import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * TraceBulkWriteProcessor works for trace flush.
- */
-@Slf4j
-@ThreadSafe
-public class TraceBulkWriteProcessor extends
AbstractBulkWriteProcessor<BanyandbTrace.WriteRequest,
- TraceServiceGrpc.TraceServiceStub> {
- private final BanyanDBClient client;
- private final Histogram writeHistogram;
- private final Options options;
-
- /**
- * Create the processor.
- *
- * @param client the client
- * @param maxBulkSize the max bulk size for the flush operation
- * @param flushInterval if given maxBulkSize is not reached in this
period, the flush would be trigger
- * automatically. Unit is second.
- * @param timeout network timeout threshold in seconds.
- * @param concurrency the number of concurrency would run for the flush
max.
- */
- protected TraceBulkWriteProcessor(
- final BanyanDBClient client,
- final int maxBulkSize,
- final int flushInterval,
- final int concurrency,
- final int timeout,
- final Histogram writeHistogram,
- final Options options) {
- super(client.getTraceServiceStub(), "TraceBulkWriteProcessor",
maxBulkSize, flushInterval, concurrency, timeout);
- this.client = client;
- this.writeHistogram = writeHistogram;
- this.options = options;
- }
-
- @Override
- protected StreamObserver<BanyandbTrace.WriteRequest>
buildStreamObserver(TraceServiceGrpc.TraceServiceStub stub,
CompletableFuture<Void> batch) {
- return stub.write(
- new StreamObserver<BanyandbTrace.WriteResponse>() {
- private final Set<String> schemaExpired = new HashSet<>();
-
- @Override
- public void onNext(BanyandbTrace.WriteResponse
writeResponse) {
- BanyandbModel.Status status =
StatusUtil.convertStringToStatus(writeResponse.getStatus());
- switch (status) {
- case STATUS_SUCCEED:
- break;
- case STATUS_EXPIRED_SCHEMA:
- BanyandbCommon.Metadata metadata =
writeResponse.getMetadata();
- String schemaKey = metadata.getGroup() + "." +
metadata.getName();
- if (!schemaExpired.contains(schemaKey)) {
- log.warn("The trace schema {} is expired,
trying update the schema...", schemaKey);
- try {
-
client.updateTraceMetadataCacheFromServer(metadata.getGroup(),
metadata.getName());
- schemaExpired.add(schemaKey);
- } catch (BanyanDBException e) {
- log.error(e.getMessage(), e);
- }
- }
- break;
- default:
- log.warn("Write trace failed with status: {}",
status);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- batch.completeExceptionally(t);
- log.error("Error occurs in flushing traces", t);
- }
-
- @Override
- public void onCompleted() {
- batch.complete(null);
- }
- });
- }
-
- @Override
- protected CompletableFuture<Void> doObservedFlush(final List<Holder> data)
{
- Histogram.Timer timer = writeHistogram.labels(
- "trace",
- "bulk_write",
- options.getPrometheusMetricsOpts().getClientID()
- ).startTimer();
- return super.doFlush(data, timer);
- }
-}
\ No newline at end of file
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index 67c96d5..7fddcf9 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
@@ -34,6 +35,8 @@ import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
import org.junit.After;
@@ -44,7 +47,6 @@ import org.junit.Test;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -52,7 +54,6 @@ import java.util.concurrent.TimeoutException;
import static org.awaitility.Awaitility.await;
public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI {
- private MeasureBulkWriteProcessor processor;
@Before
public void setUp() throws IOException, BanyanDBException,
InterruptedException {
@@ -62,14 +63,10 @@ public class ITBanyanDBMeasureQueryTests extends
BanyanDBClientTestCI {
Assert.assertNotNull(expectedGroup);
Measure expectedMeasure = buildMeasure();
client.define(expectedMeasure);
- processor = client.buildMeasureWriteProcessor(1000, 1, 1, 10);
}
@After
public void tearDown() throws IOException {
- if (this.processor != null) {
- this.processor.close();
- }
this.closeClient();
}
@@ -81,13 +78,29 @@ public class ITBanyanDBMeasureQueryTests extends
BanyanDBClientTestCI {
MeasureWrite measureWrite = client.createMeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
measureWrite.tag("entity_id",
TagAndValue.stringTagValue("entity_1")).field("total",
TagAndValue.longFieldValue(100)).field("value", TagAndValue.longFieldValue(1));
+ StreamObserver<BanyandbMeasure.WriteRequest> writeObserver
+ = client.getMeasureServiceStub().write(new
StreamObserver<BanyandbMeasure.WriteResponse>() {
+ @Override
+ public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
+
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(),
writeResponse.getStatus());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Assert.fail("write failed: " + throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
- CompletableFuture<Void> f = processor.add(measureWrite);
- f.exceptionally(exp -> {
- Assert.fail(exp.getMessage());
- return null;
+ }
});
- f.get(10, TimeUnit.SECONDS);
+ try {
+ writeObserver.onNext(measureWrite.build());
+
+ } finally {
+ writeObserver.onCompleted();
+ }
MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metric"),
"service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1,
ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("entity_id"), // tags
ImmutableSet.of("total")); // fields
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
index be0fb09..e752e67 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
@@ -83,7 +83,8 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Property property = buildProperty("default", "sw",
"ui_template").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ PropertyStore store = new PropertyStore(client.getChannel());
+ Assert.assertTrue(store.apply(property).getCreated());
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
BanyandbProperty.QueryResponse resp =
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -103,7 +104,8 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Property property = buildProperty("default", "sw",
"ui_template").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ PropertyStore store = new PropertyStore(client.getChannel());
+ Assert.assertTrue(store.apply(property).getCreated());
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
BanyandbProperty.QueryResponse resp =
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -116,8 +118,8 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Assert.assertNotNull(gotProperty);
Assert.assertEquals(property.getTagsList(),
gotProperty.getTagsList());
});
-
- Assert.assertTrue(this.client.deleteProperty("default", "sw",
"ui_template").getDeleted());
+ BanyandbProperty.DeleteResponse result = store.delete("default", "sw",
"ui_template");
+ Assert.assertTrue(result.getDeleted());
BanyandbProperty.QueryResponse resp =
client.query(BanyandbProperty.QueryRequest.newBuilder()
.addGroups("default")
.setName("sw")
@@ -131,12 +133,13 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Property property1 = buildProperty("default", "sw",
"ui_template").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
- Assert.assertTrue(this.client.apply(property1).getCreated());
+ PropertyStore store = new PropertyStore(client.getChannel());
+ Assert.assertTrue(store.apply(property1).getCreated());
Property property2 = buildProperty("default", "sw",
"ui_template").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("word")))).build();
- Assert.assertFalse(this.client.apply(property2).getCreated());
+ Assert.assertFalse(store.apply(property2).getCreated());
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
BanyandbProperty.QueryResponse resp =
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -156,11 +159,12 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Property property = buildProperty("default", "sw",
"id1").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ PropertyStore store = new PropertyStore(client.getChannel());
+ Assert.assertTrue(store.apply(property).getCreated());
property = buildProperty("default", "sw", "id2").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ Assert.assertTrue(store.apply(property).getCreated());
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
client.query(new PropertyQuery(Lists.newArrayList("default"),
"sw", ImmutableSet.of("name")).build(null));
@@ -194,11 +198,12 @@ public class ITBanyanDBPropertyTests extends
BanyanDBClientTestCI {
Property property = buildProperty("default", "sw",
"id1").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ PropertyStore store = new PropertyStore(client.getChannel());
+ Assert.assertTrue(store.apply(property).getCreated());
property = buildProperty("default", "sw", "id2").toBuilder().addTags(
Tag.newBuilder().setKey("name").setValue(
TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build();
- Assert.assertTrue(this.client.apply(property).getCreated());
+ Assert.assertTrue(store.apply(property).getCreated());
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
BanyandbProperty.QueryResponse resp = client.query(new
PropertyQuery(Lists.newArrayList("default"), "sw",
ImmutableSet.of("name")).build());
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index c394138..d098d81 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;
@@ -37,16 +38,16 @@ import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.io.IOException;
import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -55,7 +56,6 @@ import static
org.apache.skywalking.banyandb.v1.client.BanyanDBClient.DEFAULT_EX
import static org.awaitility.Awaitility.await;
public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI {
- private StreamBulkWriteProcessor processor;
@Before
public void setUp() throws IOException, BanyanDBException,
InterruptedException {
@@ -64,14 +64,10 @@ public class ITBanyanDBStreamQueryTests extends
BanyanDBClientTestCI {
this.client.define(buildStream());
this.client.define(buildIndexRule());
this.client.define(buildIndexRuleBinding());
- processor = client.buildStreamWriteProcessor(1000, 1, 1, 10);
}
@After
public void tearDown() throws IOException {
- if (processor != null) {
- this.processor.close();
- }
this.closeClient();
}
@@ -110,13 +106,28 @@ public class ITBanyanDBStreamQueryTests extends
BanyanDBClientTestCI {
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
streamWrite.setTimestamp(now.toEpochMilli());
+ StreamObserver<BanyandbStream.WriteRequest> writeObserver
+ = client.getStreamServiceStub().write(new
StreamObserver<BanyandbStream.WriteResponse>() {
+ @Override
+ public void onNext(BanyandbStream.WriteResponse writeResponse) {
+
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(),
writeResponse.getStatus());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Assert.fail("write failed: " + throwable.getMessage());
+ }
- CompletableFuture<Void> f = processor.add(streamWrite);
- f.exceptionally(exp -> {
- Assert.fail(exp.getMessage());
- return null;
+ @Override
+ public void onCompleted() {
+ }
});
- f.get(10, TimeUnit.SECONDS);
+ try {
+ writeObserver.onNext(streamWrite.build());
+
+ } finally {
+ writeObserver.onCompleted();
+ }
StreamQuery query = new StreamQuery(
Lists.newArrayList("sw_record"), "trace", ImmutableSet.of("state",
"duration", "trace_id", "data_binary"));
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
index 15a16df..0d45187 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
@@ -20,22 +20,23 @@ package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -50,7 +51,6 @@ import static org.awaitility.Awaitility.await;
public class ITTraceTest extends BanyanDBClientTestCI {
private final String groupName = "sw_trace";
private final String traceName = "trace_data";
- private TraceBulkWriteProcessor processor;
@Before
public void setUp() throws IOException, BanyanDBException,
InterruptedException {
@@ -95,15 +95,10 @@ public class ITTraceTest extends BanyanDBClientTestCI {
this.client.define(trace);
this.client.define(buildIndexRule());
this.client.define(buildIndexRuleBinding());
-
- processor = client.buildTraceWriteProcessor(1000, 1, 1, 10);
}
@After
public void tearDown() throws IOException {
- if (processor != null) {
- processor.close();
- }
this.closeClient();
}
@@ -136,14 +131,29 @@ public class ITTraceTest extends BanyanDBClientTestCI {
.tag("start_time", Value.timestampTagValue(now.toEpochMilli()))
.span(spanData)
.version(1L);
-
- // Write the trace via bulk processor
- CompletableFuture<Void> writeFuture = processor.add(traceWrite);
- writeFuture.exceptionally(exp -> {
- Assert.fail("Write failed: " + exp.getMessage());
- return null;
+
+ StreamObserver<BanyandbTrace.WriteRequest> writeObserver
+ = client.getTraceServiceStub().write(new
StreamObserver<BanyandbTrace.WriteResponse>() {
+ @Override
+ public void onNext(BanyandbTrace.WriteResponse writeResponse) {
+
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(),
writeResponse.getStatus());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Assert.fail("write failed: " + throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
});
- writeFuture.get(10, TimeUnit.SECONDS);
+ try {
+ writeObserver.onNext(traceWrite.build());
+ } finally {
+ writeObserver.onCompleted();
+ }
// Create trace query with trace_id condition
TraceQuery query = new TraceQuery(
@@ -211,13 +221,30 @@ public class ITTraceTest extends BanyanDBClientTestCI {
.tag("start_time",
Value.timestampTagValue(baseTime.plusSeconds(120).toEpochMilli()))
.span("span-data-3".getBytes())
.version(1L);
-
- // Write the traces via bulk processor
- CompletableFuture<Void> future1 = processor.add(trace1);
- CompletableFuture<Void> future2 = processor.add(trace2);
- CompletableFuture<Void> future3 = processor.add(trace3);
-
- CompletableFuture.allOf(future1, future2, future3).get(10,
TimeUnit.SECONDS);
+ StreamObserver<BanyandbTrace.WriteRequest> writeObserver
+ = client.getTraceServiceStub().write(new
StreamObserver<BanyandbTrace.WriteResponse>() {
+ @Override
+ public void onNext(BanyandbTrace.WriteResponse writeResponse) {
+
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(),
writeResponse.getStatus());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Assert.fail("write failed: " + throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ try {
+ writeObserver.onNext(trace1.build());
+ writeObserver.onNext(trace2.build());
+ writeObserver.onNext(trace3.build());
+ } finally {
+ writeObserver.onCompleted();
+ }
// Create trace query with order by start_time (no trace_id condition
as it interferes with ordering)
TraceQuery query = new TraceQuery(