wu-sheng commented on code in PR #47: URL: https://github.com/apache/skywalking-banyandb-java-client/pull/47#discussion_r1327124354
########## src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java: ########## @@ -34,29 +40,50 @@ @ThreadSafe public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbStream.WriteRequest, StreamServiceGrpc.StreamServiceStub> { + private final BanyanDBClient client; + /** * Create the processor. * - * @param serviceStub stub for gRPC call. + * @param client the client * @param maxBulkSize the max bulk size for the flush operation * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second. * @param concurrency the number of concurrency would run for the flush max. */ protected StreamBulkWriteProcessor( - final StreamServiceGrpc.StreamServiceStub serviceStub, + final BanyanDBClient client, final int maxBulkSize, final int flushInterval, final int concurrency) { - super(serviceStub, "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency); + super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency); + this.client = client; } @Override protected StreamObserver<BanyandbStream.WriteRequest> buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, CompletableFuture<Void> batch) { return stub.write( new StreamObserver<BanyandbStream.WriteResponse>() { + private final Set<String> schemaExpired = new HashSet<>(); + @Override public void onNext(BanyandbStream.WriteResponse writeResponse) { + switch (writeResponse.getStatus()) { + case STATUS_EXPIRED_SCHEMA: + BanyandbCommon.Metadata metadata = writeResponse.getMetadata(); + String schemaKey = metadata.getGroup() + "." + metadata.getName(); + if (!schemaExpired.contains(schemaKey)) { + log.warn("The schema {} is expired, trying update the schema...", schemaKey); + try { + client.findStream(metadata.getGroup(), metadata.getName()); + schemaExpired.add(schemaKey); + } catch (BanyanDBException e) { + throw new RuntimeException(e); Review Comment: I think we usually through banyandb exception rather than runtime exception, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org