This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/trace-response in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit 560fc48b8241bf0e880ed9844d241c3a324d835e Author: Gao Hongtao <[email protected]> AuthorDate: Tue Sep 9 20:37:10 2025 +0800 Restore stream functions Signed-off-by: Gao Hongtao <[email protected]> --- .../banyandb/v1/client/BanyanDBClient.java | 29 ++++++++++++++++------ .../skywalking/banyandb/v1/client/StreamWrite.java | 10 ++++++++ .../v1/client/ITBanyanDBStreamQueryTests.java | 3 +-- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index bcae829..0cd45bb 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -400,15 +400,30 @@ public class BanyanDBClient implements Closeable { } /** - * Build a trace bulk write processor. + * Build a StreamWrite request. * - * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array. - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - * @param timeout network timeout threshold in seconds. - * @return trace bulk write processor + * @param group the group of the stream + * @param name the name of the stream + * @param elementId the primary key of the stream + * @param timestamp the timestamp of the stream + * @return the request to be built */ + public StreamWrite createStreamWrite(String group, String name, final String elementId, long timestamp) throws BanyanDBException { + Preconditions.checkArgument(!Strings.isNullOrEmpty(group)); + Preconditions.checkArgument(!Strings.isNullOrEmpty(name)); + return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp); + } + + /** + * Build a trace bulk write processor. + * + * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array. + * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger + * automatically. Unit is second. + * @param concurrency the number of concurrency would run for the flush max. + * @param timeout network timeout threshold in seconds. + * @return trace bulk write processor + */ public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.traceServiceStub != null, "trace service is null"); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java index c10d70d..ad84b3e 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java @@ -50,6 +50,16 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> { this.elementId = elementId; } + /** + * Create a StreamWrite with initial timestamp. + * + * @param timestamp in milliseconds + */ + public StreamWrite(MetadataCache.EntityMetadata streamMetadata, String elementId, long timestamp) { + this(streamMetadata, elementId); + this.timestamp = Optional.of(timestamp); + } + @Override public StreamWrite tag(String tagName, Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException { return (StreamWrite) super.tag(tagName, tagValue); diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java index c394138..c4f2573 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java @@ -94,7 +94,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { String dbType = "SQL"; String dbInstance = "127.0.0.1:3306"; - StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId) + StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId, now.toEpochMilli()) .tag("data_binary", Value.binaryTagValue(byteData)) .tag("trace_id", Value.stringTagValue(traceId)) // 0 .tag("state", Value.longTagValue(state)) // 1 @@ -109,7 +109,6 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { .tag("mq.broker", Value.stringTagValue(broker)) // 10 .tag("mq.topic", Value.stringTagValue(topic)) // 11 .tag("mq.queue", Value.stringTagValue(queue)); // 12 - streamWrite.setTimestamp(now.toEpochMilli()); CompletableFuture<Void> f = processor.add(streamWrite); f.exceptionally(exp -> {
