This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch fix in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit b54dd81817f272227ef9d5e79e3d827b48ea3f28 Author: wu-sheng <wu.sh...@foxmail.com> AuthorDate: Fri Sep 5 10:06:39 2025 +0800 Optimize APIs, remove unnecessary ts parameter of trace model --- .../banyandb/v1/client/AbstractWrite.java | 33 +++++++++------------- .../banyandb/v1/client/BanyanDBClient.java | 29 ------------------- .../banyandb/v1/client/MeasureWrite.java | 9 +++++- .../skywalking/banyandb/v1/client/StreamWrite.java | 18 +++++++----- .../skywalking/banyandb/v1/client/TraceWrite.java | 15 ++-------- .../v1/client/ITBanyanDBStreamQueryTests.java | 3 +- .../skywalking/banyandb/v1/client/ITTraceTest.java | 8 +++--- 7 files changed, 41 insertions(+), 74 deletions(-) diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java index 72067da..8faa30d 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java @@ -18,8 +18,6 @@ package org.apache.skywalking.banyandb.v1.client; -import com.google.protobuf.Timestamp; - import java.util.Map; import java.util.Optional; @@ -35,28 +33,31 @@ public abstract class AbstractWrite<P extends com.google.protobuf.GeneratedMessa /** * Timestamp represents the time of current stream * in the timeunit of milliseconds. + * + * Measure and stream require timestamp. + * Trace doesn't require extra timestamp. */ @Getter - protected long timestamp; + protected Optional<Long> timestamp; protected final Object[] tags; protected final MetadataCache.EntityMetadata entityMetadata; public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) { - if (entityMetadata == null) { - throw new IllegalArgumentException("metadata not found"); - } - this.entityMetadata = entityMetadata; - this.timestamp = timestamp; - this.tags = new Object[this.entityMetadata.getTotalTags()]; + this(entityMetadata); + this.timestamp = Optional.of(timestamp); } /** * Build a write request without initial timestamp. */ AbstractWrite(MetadataCache.EntityMetadata entityMetadata) { - this(entityMetadata, 0); + if (entityMetadata == null) { + throw new IllegalArgumentException("metadata not found"); + } + this.entityMetadata = entityMetadata; + this.tags = new Object[this.entityMetadata.getTotalTags()]; } public AbstractWrite<P> tag(String tagName, Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException { @@ -69,19 +70,13 @@ public abstract class AbstractWrite<P extends com.google.protobuf.GeneratedMessa } P build() { - if (timestamp <= 0) { - throw new IllegalArgumentException("timestamp is invalid."); - } - BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder() .setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build(); - Timestamp ts = Timestamp.newBuilder() - .setSeconds(timestamp / 1000) - .setNanos((int) (timestamp % 1000 * 1_000_000)).build(); - return build(metadata, ts); + + return build(metadata); } - protected abstract P build(BanyandbCommon.Metadata metadata, Timestamp ts); + protected abstract P build(BanyandbCommon.Metadata metadata); @Override public String toString() { diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index 8f8e252..bcae829 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -399,21 +399,6 @@ public class BanyanDBClient implements Closeable { return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId); } - /** - * Build a StreamWrite request. - * - * @param group the group of the stream - * @param name the name of the stream - * @param elementId the primary key of the stream - * @param timestamp the timestamp of the stream - * @return the request to be built - */ - public StreamWrite createStreamWrite(String group, String name, final String elementId, long timestamp) throws BanyanDBException { - Preconditions.checkArgument(!Strings.isNullOrEmpty(group)); - Preconditions.checkArgument(!Strings.isNullOrEmpty(name)); - return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp); - } - /** * Build a trace bulk write processor. * @@ -430,20 +415,6 @@ public class BanyanDBClient implements Closeable { return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); } - /** - * Build a TraceWrite request. - * - * @param group the group of the trace - * @param name the name of the trace - * @param timestamp the timestamp of the trace - * @return the request to be built - */ - public TraceWrite createTraceWrite(String group, String name, long timestamp) throws BanyanDBException { - Preconditions.checkArgument(!Strings.isNullOrEmpty(group)); - Preconditions.checkArgument(!Strings.isNullOrEmpty(name)); - return new TraceWrite(this.metadataCache.findTraceMetadata(group, name), timestamp); - } - /** * Build a TraceWrite request without initial timestamp. * diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java index d405c61..62774fe 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java @@ -58,7 +58,14 @@ public class MeasureWrite extends AbstractWrite<BanyandbMeasure.WriteRequest> { * @return {@link BanyandbMeasure.WriteRequest} for the bulk process. */ @Override - protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) { + protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata metadata) { + if (!timestamp.isPresent() || timestamp.get() <= 0) { + throw new IllegalArgumentException("timestamp is invalid."); + } + Timestamp ts = Timestamp.newBuilder() + .setSeconds(timestamp.get() / 1000) + .setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build(); + final BanyandbMeasure.WriteRequest.Builder builder = BanyandbMeasure.WriteRequest.newBuilder(); builder.setMetadata(metadata); final BanyandbMeasure.DataPointValue.Builder datapointValueBuilder = BanyandbMeasure.DataPointValue.newBuilder(); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java index d77b26e..c219891 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java @@ -21,6 +21,8 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.protobuf.Timestamp; import java.util.Deque; import java.util.LinkedList; +import java.util.Optional; + import lombok.Getter; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; @@ -40,11 +42,6 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> { @Getter private final String elementId; - StreamWrite(MetadataCache.EntityMetadata entityMetadata, final String elementId, long timestamp) { - super(entityMetadata, timestamp); - this.elementId = elementId; - } - /** * Create a StreamWrite without initial timestamp. */ @@ -59,7 +56,7 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> { } public void setTimestamp(long timestamp) { - super.timestamp = timestamp; + super.timestamp = Optional.of(timestamp); } /** @@ -68,7 +65,14 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> { * @return {@link BanyandbStream.WriteRequest} for the bulk process. */ @Override - protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) { + protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata metadata) { + if (!timestamp.isPresent() || timestamp.get() <= 0) { + throw new IllegalArgumentException("timestamp is invalid."); + } + Timestamp ts = Timestamp.newBuilder() + .setSeconds(timestamp.get() / 1000) + .setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build(); + final BanyandbStream.WriteRequest.Builder builder = BanyandbStream.WriteRequest.newBuilder(); builder.setMetadata(metadata); final BanyandbStream.ElementValue.Builder elemValBuilder = BanyandbStream.ElementValue.newBuilder(); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java index d0ce687..036ad77 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java @@ -19,7 +19,6 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; import java.util.ArrayList; import java.util.List; @@ -34,7 +33,7 @@ import org.apache.skywalking.banyandb.v1.client.metadata.Serializable; /** * TraceWrite represents a write operation, including necessary fields, for {@link - * BanyanDBClient#buildTraceBulkWriteProcessor}. + * BanyanDBClient#buildTraceWriteProcessor(int, int, int, int)}. */ public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> { /** @@ -49,12 +48,6 @@ public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> { @Getter private long version; - TraceWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) { - super(entityMetadata, timestamp); - this.span = ByteString.EMPTY; - this.version = 1L; - } - /** * Create a TraceWrite without initial timestamp. */ @@ -99,17 +92,13 @@ public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> { return this; } - public void setTimestamp(long timestamp) { - super.timestamp = timestamp; - } - /** * Build a write request * * @return {@link BanyandbTrace.WriteRequest} for the bulk process. */ @Override - protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) { + protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata metadata) { final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder(); builder.setMetadata(metadata); diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java index c4f2573..c394138 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java @@ -94,7 +94,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { String dbType = "SQL"; String dbInstance = "127.0.0.1:3306"; - StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId, now.toEpochMilli()) + StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId) .tag("data_binary", Value.binaryTagValue(byteData)) .tag("trace_id", Value.stringTagValue(traceId)) // 0 .tag("state", Value.longTagValue(state)) // 1 @@ -109,6 +109,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { .tag("mq.broker", Value.stringTagValue(broker)) // 10 .tag("mq.topic", Value.stringTagValue(topic)) // 11 .tag("mq.queue", Value.stringTagValue(queue)); // 12 + streamWrite.setTimestamp(now.toEpochMilli()); CompletableFuture<Void> f = processor.add(streamWrite); f.exceptionally(exp -> { diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java index cd4d2f0..76eb168 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java @@ -129,7 +129,7 @@ public class ITTraceTest extends BanyanDBClientTestCI { byte[] spanData = "query-test-span-data".getBytes(); // Create and write trace data - TraceWrite traceWrite = client.createTraceWrite(groupName, traceName, now.toEpochMilli()) + TraceWrite traceWrite = client.createTraceWrite(groupName, traceName) .tag("trace_id", Value.stringTagValue(traceId)) .tag("span_id", Value.stringTagValue(spanId)) .tag("service_name", Value.stringTagValue(serviceName)) @@ -183,7 +183,7 @@ public class ITTraceTest extends BanyanDBClientTestCI { Instant baseTime = Instant.now().minusSeconds(60); // Start 1 minute ago // Create 3 traces with different timestamps (1 minute apart) - TraceWrite trace1 = client.createTraceWrite(groupName, traceName, baseTime.toEpochMilli()) + TraceWrite trace1 = client.createTraceWrite(groupName, traceName) .tag("trace_id", Value.stringTagValue(traceId + "1")) .tag("span_id", Value.stringTagValue("span-1")) .tag("service_name", Value.stringTagValue(serviceName)) @@ -191,7 +191,7 @@ public class ITTraceTest extends BanyanDBClientTestCI { .span("span-data-1".getBytes()) .version(1L); - TraceWrite trace2 = client.createTraceWrite(groupName, traceName, baseTime.plusSeconds(60).toEpochMilli()) + TraceWrite trace2 = client.createTraceWrite(groupName, traceName) .tag("trace_id", Value.stringTagValue(traceId + "2")) .tag("span_id", Value.stringTagValue("span-2")) .tag("service_name", Value.stringTagValue(serviceName)) @@ -199,7 +199,7 @@ public class ITTraceTest extends BanyanDBClientTestCI { .span("span-data-2".getBytes()) .version(1L); - TraceWrite trace3 = client.createTraceWrite(groupName, traceName, baseTime.plusSeconds(120).toEpochMilli()) + TraceWrite trace3 = client.createTraceWrite(groupName, traceName) .tag("trace_id", Value.stringTagValue(traceId + "3")) .tag("span_id", Value.stringTagValue("span-3")) .tag("service_name", Value.stringTagValue(serviceName))