This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch polish
in repository 
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit 7d6adecda901bca8f213b9047458ef20bd1dfe52
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Mon Jul 1 16:13:45 2024 +0800

    Enhance the BulkWriteProcessor
    1. Remove DataCarrier.
    2. Make the bulk process in sync mode. Once the bulk size is reached, 
flush(gRPC) process starts immediately.
    3. Add a timer as additional processor when the data slightly overs the 
bulk size.
    4. Expose the gRPC timeout for bulk processor creation. Fix previous 
misused flushInterval as timeout.
    5. Correct concurrency implementation. This OAP concept means, the max 
accepted sources to flush data concurrently. But the threads are controlled by 
the OAP persistent timer thread pool.
---
 .../banyandb/commons/datacarrier/DataCarrier.java  | 166 ---------------------
 .../banyandb/commons/datacarrier/EnvUtil.java      |  50 -------
 .../buffer/ArrayBlockingQueueBuffer.java           |  67 ---------
 .../commons/datacarrier/buffer/Buffer.java         |  76 ----------
 .../commons/datacarrier/buffer/BufferStrategy.java |  23 ---
 .../commons/datacarrier/buffer/Channels.java       |  93 ------------
 .../commons/datacarrier/buffer/QueueBuffer.java    |  46 ------
 .../datacarrier/common/AtomicRangeInteger.java     |  76 ----------
 .../datacarrier/consumer/BulkConsumePool.java      | 118 ---------------
 .../datacarrier/consumer/ConsumeDriver.java        | 137 -----------------
 .../consumer/ConsumerCannotBeCreatedException.java |  25 ----
 .../commons/datacarrier/consumer/ConsumerPool.java |  30 ----
 .../datacarrier/consumer/ConsumerPoolFactory.java  |  50 -------
 .../datacarrier/consumer/ConsumerThread.java       | 105 -------------
 .../commons/datacarrier/consumer/IConsumer.java    |  40 -----
 .../commons/datacarrier/consumer/IDriver.java      |  32 ----
 .../consumer/MultipleChannelsConsumer.java         | 124 ---------------
 .../datacarrier/partition/IDataPartitioner.java    |  32 ----
 .../partition/ProducerThreadPartitioner.java       |  37 -----
 .../partition/SimpleRollingPartitioner.java        |  37 -----
 .../v1/client/AbstractBulkWriteProcessor.java      | 137 +++++++++++++----
 .../banyandb/v1/client/BanyanDBClient.java         |  10 +-
 .../banyandb/v1/client/BulkWriteProcessor.java     | 129 ----------------
 .../v1/client/MeasureBulkWriteProcessor.java       |   8 +-
 .../v1/client/StreamBulkWriteProcessor.java        |   8 +-
 .../v1/client/BanyanDBClientMeasureWriteTest.java  |   2 +-
 .../v1/client/BanyanDBClientStreamWriteTest.java   |   2 +-
 .../v1/client/ITBanyanDBMeasureQueryTests.java     |   2 +-
 .../v1/client/ITBanyanDBStreamQueryTests.java      |   2 +-
 29 files changed, 127 insertions(+), 1537 deletions(-)

diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
deleted file mode 100644
index db2ed2d..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier;
-
-import java.util.Properties;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumeDriver;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumerPool;
-import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IDriver;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.partition.SimpleRollingPartitioner;
-
-/**
- * DataCarrier main class. use this instance to set Producer/Consumer Model.
- */
-public class DataCarrier<T> {
-    private Channels<T> channels;
-    private IDriver driver;
-    private String name;
-
-    public DataCarrier(int channelSize, int bufferSize) {
-        this("DEFAULT", channelSize, bufferSize);
-    }
-
-    public DataCarrier(String name, int channelSize, int bufferSize) {
-        this(name, name, channelSize, bufferSize);
-    }
-
-    public DataCarrier(String name, String envPrefix, int channelSize, int 
bufferSize) {
-        this(name, envPrefix, channelSize, bufferSize, 
BufferStrategy.BLOCKING);
-    }
-
-    public DataCarrier(String name, String envPrefix, int channelSize, int 
bufferSize, BufferStrategy strategy) {
-        this.name = name;
-        bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
-        channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
-        channels = new Channels<>(channelSize, bufferSize, new 
SimpleRollingPartitioner<T>(), strategy);
-    }
-
-    public DataCarrier(int channelSize, int bufferSize, BufferStrategy 
strategy) {
-        this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
-    }
-
-    /**
-     * set a new IDataPartitioner. It will cover the current one or default 
one.(Default is {@link
-     * SimpleRollingPartitioner}
-     *
-     * @param dataPartitioner to partition data into different channel by some 
rules.
-     * @return DataCarrier instance for chain
-     */
-    public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) {
-        this.channels.setPartitioner(dataPartitioner);
-        return this;
-    }
-
-    /**
-     * produce data to buffer, using the given {@link BufferStrategy}.
-     *
-     * @return false means produce data failure. The data will not be consumed.
-     */
-    public boolean produce(T data) {
-        if (driver != null) {
-            if (!driver.isRunning(channels)) {
-                return false;
-            }
-        }
-
-        return this.channels.save(data);
-    }
-
-    /**
-     * set consumeDriver to this Carrier. consumer begin to run when {@link 
DataCarrier#produce} begin to work.
-     *
-     * @param consumerClass class of consumer
-     * @param num           number of consumer threads
-     * @param properties    for initializing consumer.
-     */
-    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass,
-                               int num,
-                               long consumeCycle,
-                               Properties properties) {
-        if (driver != null) {
-            driver.close(channels);
-        }
-        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, 
num, consumeCycle, properties);
-        driver.begin(channels);
-        return this;
-    }
-
-    /**
-     * set consumeDriver to this Carrier. consumer begin to run when {@link 
DataCarrier#produce} begin to work with 20
-     * millis consume cycle.
-     *
-     * @param consumerClass class of consumer
-     * @param num           number of consumer threads
-     */
-    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, 
int num) {
-        return this.consume(consumerClass, num, 20, new Properties());
-    }
-
-    /**
-     * set consumeDriver to this Carrier. consumer begin to run when {@link 
DataCarrier#produce} begin to work.
-     *
-     * @param consumer single instance of consumer, all consumer threads will 
all use this instance.
-     * @param num      number of consumer threads
-     */
-    public DataCarrier consume(IConsumer<T> consumer, int num, long 
consumeCycle) {
-        if (driver != null) {
-            driver.close(channels);
-        }
-        driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, 
consumeCycle);
-        driver.begin(channels);
-        return this;
-    }
-
-    /**
-     * set consumeDriver to this Carrier. consumer begin to run when {@link 
DataCarrier#produce} begin to work with 20
-     * millis consume cycle.
-     *
-     * @param consumer single instance of consumer, all consumer threads will 
all use this instance.
-     * @param num      number of consumer threads
-     */
-    public DataCarrier consume(IConsumer<T> consumer, int num) {
-        return this.consume(consumer, num, 20);
-    }
-
-    /**
-     * Set a consumer pool to manage the channels of this DataCarrier. Then 
consumerPool could use its own consuming
-     * model to adjust the consumer thread and throughput.
-     */
-    public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> 
consumer) {
-        driver = consumerPool;
-        consumerPool.add(this.name, channels, consumer);
-        driver.begin(channels);
-        return this;
-    }
-
-    /**
-     * shutdown all consumer threads, if consumer threads are running. Notice 
{@link BufferStrategy}: if {@link
-     * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown 
consumeDriver maybe cause blocking when producing.
-     * Better way to change consumeDriver are use {@link DataCarrier#consume}
-     */
-    public void shutdownConsumers() {
-        if (driver != null) {
-            driver.close(channels);
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
deleted file mode 100644
index 36f0cb6..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier;
-
-/**
- * Read value from system env.
- */
-public class EnvUtil {
-    public static int getInt(String envName, int defaultValue) {
-        int value = defaultValue;
-        String envValue = System.getenv(envName);
-        if (envValue != null) {
-            try {
-                value = Integer.parseInt(envValue);
-            } catch (NumberFormatException e) {
-
-            }
-        }
-        return value;
-    }
-
-    public static long getLong(String envName, long defaultValue) {
-        long value = defaultValue;
-        String envValue = System.getenv(envName);
-        if (envValue != null) {
-            try {
-                value = Long.parseLong(envValue);
-            } catch (NumberFormatException e) {
-
-            }
-        }
-        return value;
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
deleted file mode 100644
index feb2a99..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-
-/**
- * The buffer implementation based on JDK ArrayBlockingQueue.
- * <p>
- * This implementation has better performance in server side. We are still 
trying to research whether this is suitable
- * for agent side, which is more sensitive about blocks.
- */
-public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
-    private BufferStrategy strategy;
-    private ArrayBlockingQueue<T> queue;
-    private int bufferSize;
-
-    ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
-        this.strategy = strategy;
-        this.queue = new ArrayBlockingQueue<T>(bufferSize);
-        this.bufferSize = bufferSize;
-    }
-
-    @Override
-    public boolean save(T data) {
-        //only BufferStrategy.BLOCKING
-        try {
-            queue.put(data);
-        } catch (InterruptedException e) {
-            // Ignore the error
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void setStrategy(BufferStrategy strategy) {
-        this.strategy = strategy;
-    }
-
-    @Override
-    public void obtain(List<T> consumeList) {
-        queue.drainTo(consumeList);
-    }
-
-    @Override
-    public int getBufferSize() {
-        return bufferSize;
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
deleted file mode 100644
index 38e92af..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
-
-import java.util.List;
-import 
org.apache.skywalking.banyandb.commons.datacarrier.common.AtomicRangeInteger;
-
-/**
- * Self implementation ring queue.
- */
-public class Buffer<T> implements QueueBuffer<T> {
-    private final Object[] buffer;
-    private BufferStrategy strategy;
-    private AtomicRangeInteger index;
-
-    Buffer(int bufferSize, BufferStrategy strategy) {
-        buffer = new Object[bufferSize];
-        this.strategy = strategy;
-        index = new AtomicRangeInteger(0, bufferSize);
-    }
-
-    @Override
-    public void setStrategy(BufferStrategy strategy) {
-        this.strategy = strategy;
-    }
-
-    @Override
-    public boolean save(T data) {
-        int i = index.getAndIncrement();
-        if (buffer[i] != null) {
-            switch (strategy) {
-                case IF_POSSIBLE:
-                    return false;
-                default:
-            }
-        }
-        buffer[i] = data;
-        return true;
-    }
-
-    @Override
-    public int getBufferSize() {
-        return buffer.length;
-    }
-
-    @Override
-    public void obtain(List<T> consumeList) {
-        this.obtain(consumeList, 0, buffer.length);
-    }
-
-    void obtain(List<T> consumeList, int start, int end) {
-        for (int i = start; i < end; i++) {
-            if (buffer[i] != null) {
-                consumeList.add((T) buffer[i]);
-                buffer[i] = null;
-            }
-        }
-    }
-
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
deleted file mode 100644
index 9dbc556..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
-
-public enum BufferStrategy {
-    BLOCKING, IF_POSSIBLE
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
deleted file mode 100644
index 46ce98f..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
-
-import 
org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
-
-/**
- * Channels of Buffer It contains all buffer data which belongs to this 
channel. It supports several strategy when
- * buffer is full. The Default is BLOCKING <p> Created by wusheng on 
2016/10/25.
- */
-public class Channels<T> {
-    private final QueueBuffer<T>[] bufferChannels;
-    private IDataPartitioner<T> dataPartitioner;
-    private final BufferStrategy strategy;
-    private final long size;
-
-    public Channels(int channelSize, int bufferSize, IDataPartitioner<T> 
partitioner, BufferStrategy strategy) {
-        this.dataPartitioner = partitioner;
-        this.strategy = strategy;
-        bufferChannels = new QueueBuffer[channelSize];
-        for (int i = 0; i < channelSize; i++) {
-            if (BufferStrategy.BLOCKING.equals(strategy)) {
-                bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, 
strategy);
-            } else {
-                bufferChannels[i] = new Buffer<>(bufferSize, strategy);
-            }
-        }
-        // noinspection PointlessArithmeticExpression
-        size = 1L * channelSize * bufferSize; // it's not pointless, it 
prevents numeric overflow before assigning an integer to a long
-    }
-
-    public boolean save(T data) {
-        int index = dataPartitioner.partition(bufferChannels.length, data);
-        int retryCountDown = 1;
-        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
-            int maxRetryCount = dataPartitioner.maxRetryCount();
-            if (maxRetryCount > 1) {
-                retryCountDown = maxRetryCount;
-            }
-        }
-        for (; retryCountDown > 0; retryCountDown--) {
-            if (bufferChannels[index].save(data)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
-        this.dataPartitioner = dataPartitioner;
-    }
-
-    /**
-     * override the strategy at runtime. Notice, this will override several 
channels one by one. So, when running
-     * setStrategy, each channel may use different BufferStrategy
-     */
-    public void setStrategy(BufferStrategy strategy) {
-        for (QueueBuffer<T> buffer : bufferChannels) {
-            buffer.setStrategy(strategy);
-        }
-    }
-
-    /**
-     * get channelSize
-     */
-    public int getChannelSize() {
-        return this.bufferChannels.length;
-    }
-
-    public long size() {
-        return size;
-    }
-
-    public QueueBuffer<T> getBuffer(int index) {
-        return this.bufferChannels[index];
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
deleted file mode 100644
index 4baf83c..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
-
-import java.util.List;
-
-/**
- * Queue buffer interface.
- */
-public interface QueueBuffer<T> {
-    /**
-     * Save data into the queue;
-     *
-     * @param data to add.
-     * @return true if saved
-     */
-    boolean save(T data);
-
-    /**
-     * Set different strategy when queue is full.
-     */
-    void setStrategy(BufferStrategy strategy);
-
-    /**
-     * Obtain the existing data from the queue
-     */
-    void obtain(List<T> consumeList);
-
-    int getBufferSize();
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
deleted file mode 100644
index cb2b4be..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.common;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-
-public class AtomicRangeInteger extends Number implements Serializable {
-    private static final long serialVersionUID = -4099792402691141643L;
-    private AtomicIntegerArray values;
-
-    private static final int VALUE_OFFSET = 15;
-
-    private int startValue;
-    private int endValue;
-
-    public AtomicRangeInteger(int startValue, int maxValue) {
-        this.values = new AtomicIntegerArray(31);
-        this.values.set(VALUE_OFFSET, startValue);
-        this.startValue = startValue;
-        this.endValue = maxValue - 1;
-    }
-
-    public final int getAndIncrement() {
-        int next;
-        do {
-            next = this.values.incrementAndGet(VALUE_OFFSET);
-            if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, 
next, startValue)) {
-                return endValue;
-            }
-        }
-        while (next > endValue);
-
-        return next - 1;
-    }
-
-    public final int get() {
-        return this.values.get(VALUE_OFFSET);
-    }
-
-    @Override
-    public int intValue() {
-        return this.values.get(VALUE_OFFSET);
-    }
-
-    @Override
-    public long longValue() {
-        return this.values.get(VALUE_OFFSET);
-    }
-
-    @Override
-    public float floatValue() {
-        return this.values.get(VALUE_OFFSET);
-    }
-
-    @Override
-    public double doubleValue() {
-        return this.values.get(VALUE_OFFSET);
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
deleted file mode 100644
index fd45499..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import org.apache.skywalking.banyandb.commons.datacarrier.EnvUtil;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-
-/**
- * BulkConsumePool works for consuming data from multiple channels(DataCarrier 
instances), with multiple {@link
- * MultipleChannelsConsumer}s.
- * <p>
- * In typical case, the number of {@link MultipleChannelsConsumer} should be 
less than the number of channels.
- */
-public class BulkConsumePool implements ConsumerPool {
-    private List<MultipleChannelsConsumer> allConsumers;
-    private volatile boolean isStarted = false;
-
-    public BulkConsumePool(String name, int size, long consumeCycle) {
-        size = EnvUtil.getInt(name + "_THREAD", size);
-        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
-        for (int i = 0; i < size; i++) {
-            MultipleChannelsConsumer multipleChannelsConsumer = new 
MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + 
".Thread", consumeCycle);
-            multipleChannelsConsumer.setDaemon(true);
-            allConsumers.add(multipleChannelsConsumer);
-        }
-    }
-
-    @Override
-    synchronized public void add(String name, Channels channels, IConsumer 
consumer) {
-        MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
-        multipleChannelsConsumer.addNewTarget(channels, consumer);
-    }
-
-    /**
-     * Get the lowest payload consumer thread based on current allocate status.
-     *
-     * @return the lowest consumer.
-     */
-    private MultipleChannelsConsumer getLowestPayload() {
-        MultipleChannelsConsumer winner = allConsumers.get(0);
-        for (int i = 1; i < allConsumers.size(); i++) {
-            MultipleChannelsConsumer option = allConsumers.get(i);
-            if (option.size() < winner.size()) {
-                winner = option;
-            }
-        }
-        return winner;
-    }
-
-    /**
-     *
-     */
-    @Override
-    public boolean isRunning(Channels channels) {
-        return isStarted;
-    }
-
-    @Override
-    public void close(Channels channels) {
-        for (MultipleChannelsConsumer consumer : allConsumers) {
-            consumer.shutdown();
-        }
-    }
-
-    @Override
-    public void begin(Channels channels) {
-        if (isStarted) {
-            return;
-        }
-        for (MultipleChannelsConsumer consumer : allConsumers) {
-            consumer.start();
-        }
-        isStarted = true;
-    }
-
-    /**
-     * The creator for {@link BulkConsumePool}.
-     */
-    public static class Creator implements Callable<ConsumerPool> {
-        private String name;
-        private int size;
-        private long consumeCycle;
-
-        public Creator(String name, int poolSize, long consumeCycle) {
-            this.name = name;
-            this.size = poolSize;
-            this.consumeCycle = consumeCycle;
-        }
-
-        @Override
-        public ConsumerPool call() {
-            return new BulkConsumePool(name, size, consumeCycle);
-        }
-
-        public static int recommendMaxSize() {
-            return Runtime.getRuntime().availableProcessors() * 2;
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
deleted file mode 100644
index 4535a95..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Properties;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-
-/**
- * Pool of consumers <p> Created by wusheng on 2016/10/25.
- */
-public class ConsumeDriver<T> implements IDriver {
-    private boolean running;
-    private ConsumerThread[] consumerThreads;
-    private Channels<T> channels;
-    private ReentrantLock lock;
-
-    public ConsumeDriver(String name,
-                         Channels<T> channels, Class<? extends IConsumer<T>> 
consumerClass,
-                         int num,
-                         long consumeCycle,
-                         Properties properties) {
-        this(channels, num);
-        for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread(
-                "DataCarrier." + name + ".Consumer." + i + ".Thread", 
getNewConsumerInstance(consumerClass, properties),
-                consumeCycle
-            );
-            consumerThreads[i].setDaemon(true);
-        }
-    }
-
-    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> 
prototype, int num, long consumeCycle) {
-        this(channels, num);
-        prototype.init(new Properties());
-        for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread(
-                "DataCarrier." + name + ".Consumer." + i + ".Thread", 
prototype, consumeCycle);
-            consumerThreads[i].setDaemon(true);
-        }
-
-    }
-
-    private ConsumeDriver(Channels<T> channels, int num) {
-        running = false;
-        this.channels = channels;
-        consumerThreads = new ConsumerThread[num];
-        lock = new ReentrantLock();
-    }
-
-    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> 
consumerClass, Properties properties) {
-        try {
-            IConsumer<T> inst = 
consumerClass.getDeclaredConstructor().newInstance();
-            inst.init(properties);
-            return inst;
-        } catch (InstantiationException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        } catch (IllegalAccessException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        } catch (NoSuchMethodException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        } catch (InvocationTargetException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        }
-    }
-
-    @Override
-    public void begin(Channels channels) {
-        if (running) {
-            return;
-        }
-        lock.lock();
-        try {
-            this.allocateBuffer2Thread();
-            for (ConsumerThread consumerThread : consumerThreads) {
-                consumerThread.start();
-            }
-            running = true;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public boolean isRunning(Channels channels) {
-        return running;
-    }
-
-    private void allocateBuffer2Thread() {
-        int channelSize = this.channels.getChannelSize();
-        /**
-         * if consumerThreads.length < channelSize
-         * each consumer will process several channels.
-         *
-         * if consumerThreads.length == channelSize
-         * each consumer will process one channel.
-         *
-         * if consumerThreads.length > channelSize
-         * there will be some threads do nothing.
-         */
-        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) 
{
-            int consumerIndex = channelIndex % consumerThreads.length;
-            
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
-        }
-
-    }
-
-    @Override
-    public void close(Channels channels) {
-        lock.lock();
-        try {
-            this.running = false;
-            for (ConsumerThread consumerThread : consumerThreads) {
-                consumerThread.shutdown();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
deleted file mode 100644
index 0d0c8de..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-public class ConsumerCannotBeCreatedException extends RuntimeException {
-    ConsumerCannotBeCreatedException(Throwable t) {
-        super(t);
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
deleted file mode 100644
index fd93dfb..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-
-/**
- * The Consumer pool could support data consumer from multiple {@link 
DataCarrier}s, by using different consume thread
- * management models.
- */
-public interface ConsumerPool extends IDriver {
-    void add(String name, Channels channels, IConsumer consumer);
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
deleted file mode 100644
index 8eb1581..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * Consumer Pool Factory provides global management for all Consumer Pool.
- */
-public enum ConsumerPoolFactory {
-    INSTANCE;
-
-    private final Map<String, ConsumerPool> pools;
-
-    ConsumerPoolFactory() {
-        pools = new HashMap<>();
-    }
-
-    public synchronized boolean createIfAbsent(String poolName, 
Callable<ConsumerPool> creator) throws Exception {
-        if (pools.containsKey(poolName)) {
-            return false;
-        } else {
-            pools.put(poolName, creator.call());
-            return true;
-        }
-    }
-
-    public ConsumerPool get(String poolName) {
-        return pools.get(poolName);
-    }
-
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
deleted file mode 100644
index b5f5478..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Buffer;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
-
-public class ConsumerThread<T> extends Thread {
-    private volatile boolean running;
-    private IConsumer<T> consumer;
-    private List<DataSource> dataSources;
-    private long consumeCycle;
-
-    ConsumerThread(String threadName, IConsumer<T> consumer, long 
consumeCycle) {
-        super(threadName);
-        this.consumer = consumer;
-        running = false;
-        dataSources = new ArrayList<DataSource>(1);
-        this.consumeCycle = consumeCycle;
-    }
-
-    /**
-     * add whole buffer to consume
-     */
-    void addDataSource(QueueBuffer<T> sourceBuffer) {
-        this.dataSources.add(new DataSource(sourceBuffer));
-    }
-
-    @Override
-    public void run() {
-        running = true;
-
-        final List<T> consumeList = new ArrayList<T>(1500);
-        while (running) {
-            if (!consume(consumeList)) {
-                try {
-                    Thread.sleep(consumeCycle);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-
-        // consumer thread is going to stop
-        // consume the last time
-        consume(consumeList);
-
-        consumer.onExit();
-    }
-
-    private boolean consume(List<T> consumeList) {
-        for (DataSource dataSource : dataSources) {
-            dataSource.obtain(consumeList);
-        }
-
-        if (!consumeList.isEmpty()) {
-            try {
-                consumer.consume(consumeList);
-            } catch (Throwable t) {
-                consumer.onError(consumeList, t);
-            } finally {
-                consumeList.clear();
-            }
-            return true;
-        }
-        consumer.nothingToConsume();
-        return false;
-    }
-
-    void shutdown() {
-        running = false;
-    }
-
-    /**
-     * DataSource is a refer to {@link Buffer}.
-     */
-    class DataSource {
-        private QueueBuffer<T> sourceBuffer;
-
-        DataSource(QueueBuffer<T> sourceBuffer) {
-            this.sourceBuffer = sourceBuffer;
-        }
-
-        void obtain(List<T> consumeList) {
-            sourceBuffer.obtain(consumeList);
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
deleted file mode 100644
index 9d965e1..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.util.List;
-import java.util.Properties;
-
-public interface IConsumer<T> {
-    void init(final Properties properties);
-
-    void consume(List<T> data);
-
-    void onError(List<T> data, Throwable t);
-
-    void onExit();
-
-    /**
-     * Notify the implementation, if there is nothing fetched from the queue. 
This could be used as a timer to trigger
-     * reaction if the queue has no element.
-     */
-    default void nothingToConsume() {
-        return;
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
deleted file mode 100644
index 7011c4e..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-
-/**
- * The driver of consumer.
- */
-public interface IDriver {
-    boolean isRunning(Channels channels);
-
-    void close(Channels channels);
-
-    void begin(Channels channels);
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
deleted file mode 100644
index 19969fa..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
-
-/**
- * MultipleChannelsConsumer represent a single consumer thread, but support 
multiple channels with their {@link
- * IConsumer}s
- */
-public class MultipleChannelsConsumer extends Thread {
-    private volatile boolean running;
-    private volatile ArrayList<Group> consumeTargets;
-    @SuppressWarnings("NonAtomicVolatileUpdate")
-    private volatile long size;
-    private final long consumeCycle;
-
-    public MultipleChannelsConsumer(String threadName, long consumeCycle) {
-        super(threadName);
-        this.consumeTargets = new ArrayList<Group>();
-        this.consumeCycle = consumeCycle;
-    }
-
-    @Override
-    public void run() {
-        running = true;
-
-        final List consumeList = new ArrayList(2000);
-        while (running) {
-            boolean hasData = false;
-            for (Group target : consumeTargets) {
-                boolean consume = consume(target, consumeList);
-                hasData = hasData || consume;
-            }
-
-            if (!hasData) {
-                try {
-                    Thread.sleep(consumeCycle);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-
-        // consumer thread is going to stop
-        // consume the last time
-        for (Group target : consumeTargets) {
-            consume(target, consumeList);
-
-            target.consumer.onExit();
-        }
-    }
-
-    private boolean consume(Group target, List consumeList) {
-        for (int i = 0; i < target.channels.getChannelSize(); i++) {
-            QueueBuffer buffer = target.channels.getBuffer(i);
-            buffer.obtain(consumeList);
-        }
-
-        if (!consumeList.isEmpty()) {
-            try {
-                target.consumer.consume(consumeList);
-            } catch (Throwable t) {
-                target.consumer.onError(consumeList, t);
-            } finally {
-                consumeList.clear();
-            }
-            return true;
-        }
-        target.consumer.nothingToConsume();
-        return false;
-    }
-
-    /**
-     * Add a new target channels.
-     */
-    public void addNewTarget(Channels channels, IConsumer consumer) {
-        Group group = new Group(channels, consumer);
-        // Recreate the new list to avoid change list while the list is used 
in consuming.
-        ArrayList<Group> newList = new ArrayList<Group>();
-        for (Group target : consumeTargets) {
-            newList.add(target);
-        }
-        newList.add(group);
-        consumeTargets = newList;
-        size += channels.size();
-    }
-
-    public long size() {
-        return size;
-    }
-
-    void shutdown() {
-        running = false;
-    }
-
-    private static class Group {
-        private Channels channels;
-        private IConsumer consumer;
-
-        public Group(Channels channels, IConsumer consumer) {
-            this.channels = channels;
-            this.consumer = consumer;
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
deleted file mode 100644
index 4b6c576..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.partition;
-
-import 
org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
-
-public interface IDataPartitioner<T> {
-    int partition(int total, T data);
-
-    /**
-     * @return an integer represents how many times should retry when {@link 
BufferStrategy#IF_POSSIBLE}.
-     * <p>
-     * Less or equal 1, means not support retry.
-     */
-    int maxRetryCount();
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
deleted file mode 100644
index b83fd1f..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.partition;
-
-/**
- * use threadid % total to partition
- */
-public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
-    public ProducerThreadPartitioner() {
-    }
-
-    @Override
-    public int partition(int total, T data) {
-        return (int) Thread.currentThread().getId() % total;
-    }
-
-    @Override
-    public int maxRetryCount() {
-        return 1;
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
 
b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
deleted file mode 100644
index e22f92f..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.commons.datacarrier.partition;
-
-/**
- * use normal int to rolling.
- */
-public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
-    @SuppressWarnings("NonAtomicVolatileUpdate")
-    private volatile int i = 0;
-
-    @Override
-    public int partition(int total, T data) {
-        return Math.abs(i++ % total);
-    }
-
-    @Override
-    public int maxRetryCount() {
-        return 3;
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
index 5ccb885..9d756d8 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
@@ -21,19 +21,30 @@ package org.apache.skywalking.banyandb.v1.client;
 import com.google.auto.value.AutoValue;
 import io.grpc.stub.AbstractAsyncStub;
 import io.grpc.stub.StreamObserver;
-import lombok.extern.slf4j.Slf4j;
-
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public abstract class AbstractBulkWriteProcessor<REQ extends 
com.google.protobuf.GeneratedMessageV3,
-        STUB extends AbstractAsyncStub<STUB>> extends BulkWriteProcessor {
+    STUB extends AbstractAsyncStub<STUB>>
+    implements Runnable, Closeable {
     private final STUB stub;
+    private final int maxBulkSize;
+    private final int flushInterval;
+    private final ArrayBlockingQueue<Holder> requests;
+    private final Semaphore semaphore;
+    private final long flushInternalInMillis;
+    private final ScheduledThreadPoolExecutor scheduler;
+    private final int timeout;
+    private volatile long lastFlushTS = 0;
 
     /**
      * Create the processor.
@@ -44,10 +55,32 @@ public abstract class AbstractBulkWriteProcessor<REQ 
extends com.google.protobuf
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second.
      * @param concurrency   the number of concurrency would run for the flush 
max.
+     * @param timeout       network timeout threshold in seconds.
      */
-    protected AbstractBulkWriteProcessor(STUB stub, String processorName, int 
maxBulkSize, int flushInterval, int concurrency) {
-        super(processorName, maxBulkSize, flushInterval, concurrency);
+    protected AbstractBulkWriteProcessor(STUB stub,
+                                         String processorName,
+                                         int maxBulkSize,
+                                         int flushInterval,
+                                         int concurrency,
+                                         int timeout) {
         this.stub = stub;
+        this.maxBulkSize = maxBulkSize;
+        this.flushInterval = flushInterval;
+        this.timeout = timeout;
+        requests = new ArrayBlockingQueue<>(maxBulkSize + 1);
+        this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1);
+
+        scheduler = new ScheduledThreadPoolExecutor(1, r -> {
+            final Thread thread = new Thread(r);
+            thread.setName("ElasticSearch BulkProcessor");
+            return thread;
+        });
+        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+        scheduler.setRemoveOnCancelPolicy(true);
+        flushInternalInMillis = flushInterval * 1000;
+        scheduler.scheduleWithFixedDelay(
+            this, 0, flushInterval, TimeUnit.SECONDS);
     }
 
     /**
@@ -55,22 +88,69 @@ public abstract class AbstractBulkWriteProcessor<REQ 
extends com.google.protobuf
      *
      * @param writeEntity to add.
      */
+    @SneakyThrows
     public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) {
         final CompletableFuture<Void> f = new CompletableFuture<>();
-        this.buffer.produce(Holder.create(writeEntity, f));
+        requests.put(Holder.create(writeEntity, f));
+        flushIfNeeded();
         return f;
     }
 
-    @Override
-    protected void flush(List data) {
+    public void run() {
+        try {
+            doPeriodicalFlush();
+        } catch (Throwable t) {
+            log.error("Failed to flush data to BanyanDB", t);
+        }
+    }
+
+    @SneakyThrows
+    protected void flushIfNeeded() {
+        if (requests.size() >= maxBulkSize) {
+            flush();
+        }
+    }
+
+    private void doPeriodicalFlush() {
+        if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 
2) {
+            // Run periodical flush if there is no `flushIfNeeded` executed in 
the second half of the flush period.
+            // Otherwise, wait for the next round. By default, the last 2 
seconds of the 5s period.
+            // This could avoid periodical flush running among 
bulks(controlled by bulkActions).
+            flush();
+        }
+    }
+
+    public void flush() {
+        if (requests.isEmpty()) {
+            return;
+        }
+
+        try {
+            semaphore.acquire();
+        } catch (InterruptedException e) {
+            log.error("Interrupted when trying to get semaphore to execute 
bulk requests", e);
+            return;
+        }
+
+        final List<Holder> batch = new ArrayList<>(requests.size());
+        requests.drainTo(batch);
+        final CompletableFuture<Void> future = doFlush(batch);
+        future.whenComplete((v, t) -> semaphore.release());
+        future.join();
+        lastFlushTS = System.currentTimeMillis();
+
+    }
+
+    protected CompletableFuture<Void> doFlush(final List<Holder> data) {
+        // The batch is used to control the completion of the flush operation.
+        // There is at most one error per batch,
+        // because the database server would terminate the batch process when 
the first error occurs.
         final CompletableFuture<Void> batch = new CompletableFuture<>();
         final StreamObserver<REQ> writeRequestStreamObserver
-                = 
this.buildStreamObserver(stub.withDeadlineAfter(flushInterval, 
TimeUnit.SECONDS), batch);
+            = this.buildStreamObserver(stub.withDeadlineAfter(timeout, 
TimeUnit.SECONDS), batch);
 
-        List sentData = new ArrayList(data.size());
         try {
-            data.forEach(holder -> {
-                Holder h = (Holder) holder;
+            data.forEach(h -> {
                 AbstractWrite<REQ> entity = (AbstractWrite<REQ>) 
h.writeEntity();
                 REQ request;
                 try {
@@ -81,30 +161,21 @@ public abstract class AbstractBulkWriteProcessor<REQ 
extends com.google.protobuf
                     return;
                 }
                 writeRequestStreamObserver.onNext(request);
-                sentData.add(h);
+                h.future().complete(null);
             });
-        } catch (Throwable t) {
-            log.error("Transform and send request to BanyanDB fail.", t);
-            batch.completeExceptionally(t);
         } finally {
             writeRequestStreamObserver.onCompleted();
         }
         batch.whenComplete((ignored, exp) -> {
             if (exp != null) {
-                sentData.stream().map((Function<Object, 
CompletableFuture<Void>>) o -> ((Holder) o).future())
-                        .forEach((Consumer<CompletableFuture<Void>>) it -> 
it.completeExceptionally(exp));
                 log.error("Failed to execute requests in bulk", exp);
-            } else {
-                log.debug("Succeeded to execute {} requests in bulk", 
data.size());
-                sentData.stream().map((Function<Object, 
CompletableFuture<Void>>) o -> ((Holder) o).future())
-                        .forEach((Consumer<CompletableFuture<Void>>) it -> 
it.complete(null));
             }
         });
-        try {
-            batch.get(30, TimeUnit.SECONDS);
-        } catch (Throwable t) {
-            log.error("Waiting responses from BanyanDB fail.", t);
-        }
+        return batch;
+    }
+
+    public void close() {
+        scheduler.shutdownNow();
     }
 
     protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, 
CompletableFuture<Void> batch);
@@ -115,7 +186,13 @@ public abstract class AbstractBulkWriteProcessor<REQ 
extends com.google.protobuf
 
         abstract CompletableFuture<Void> future();
 
-        public static <REQ extends com.google.protobuf.GeneratedMessageV3> 
Holder create(AbstractWrite<REQ> writeEntity, CompletableFuture<Void> future) {
+        public static <REQ extends com.google.protobuf.GeneratedMessageV3> 
Holder create(AbstractWrite<REQ> writeEntity,
+                                                                               
          CompletableFuture<Void> future) {
+            future.whenComplete((v, t) -> {
+                if (t != null) {
+                    log.error("Failed to execute the request: {}", 
writeEntity.toString(), t);
+                }
+            });
             return new 
AutoValue_AbstractBulkWriteProcessor_Holder(writeEntity, future);
         }
 
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 2ec4d65..96ad78d 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -279,12 +279,13 @@ public class BanyanDBClient implements Closeable {
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second
      * @param concurrency   the number of concurrency would run for the flush 
max
+     * @param timeout       network timeout threshold in seconds.
      * @return stream bulk write processor
      */
-    public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, 
int flushInterval, int concurrency) {
+    public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, 
int flushInterval, int concurrency, int timeout) {
         checkState(this.streamServiceStub != null, "stream service is null");
 
-        return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency);
+        return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout);
     }
 
     /**
@@ -294,12 +295,13 @@ public class BanyanDBClient implements Closeable {
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second
      * @param concurrency   the number of concurrency would run for the flush 
max
+     * @param timeout       network timeout threshold in seconds.
      * @return stream bulk write processor
      */
-    public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int 
maxBulkSize, int flushInterval, int concurrency) {
+    public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int 
maxBulkSize, int flushInterval, int concurrency, int timeout) {
         checkState(this.measureServiceStub != null, "measure service is null");
 
-        return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency);
+        return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout);
     }
 
     /**
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
deleted file mode 100644
index 1bbc1eb..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
-
-/**
- * BulkWriteProcessor is a timeline and size dual driven processor.
- * <p>
- * It includes an internal queue and timer, and accept the data sequentially. 
With the given thresholds of time and
- * size, it could activate flush to continue the process to the next step.
- */
-public abstract class BulkWriteProcessor implements Closeable {
-    protected final int flushInterval;
-    protected DataCarrier buffer;
-
-    /**
-     * Create the processor.
-     *
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
-     */
-    protected BulkWriteProcessor(String processorName, int maxBulkSize, int 
flushInterval, int concurrency) {
-        this.flushInterval = flushInterval;
-        this.buffer = new DataCarrier(processorName, concurrency, maxBulkSize);
-        Properties properties = new Properties();
-        properties.put("maxBulkSize", maxBulkSize);
-        properties.put("flushInterval", flushInterval);
-        properties.put("BulkWriteProcessor", this);
-        buffer.consume(QueueWatcher.class, concurrency, 20, properties);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.buffer.shutdownConsumers();
-    }
-
-    /**
-     * The internal queue consumer for build process.
-     */
-    public static class QueueWatcher implements IConsumer {
-        private long lastFlushTimestamp;
-        private int maxBulkSize;
-        private int flushInterval;
-        private List cachedData;
-        private BulkWriteProcessor bulkWriteProcessor;
-
-        public QueueWatcher() {
-        }
-
-        @Override
-        public void init(Properties properties) {
-            lastFlushTimestamp = System.currentTimeMillis();
-            maxBulkSize = (Integer) properties.get("maxBulkSize");
-            flushInterval = (Integer) properties.get("flushInterval") * 1000;
-            cachedData = new ArrayList(maxBulkSize);
-            bulkWriteProcessor = (BulkWriteProcessor) 
properties.get("BulkWriteProcessor");
-        }
-
-        @Override
-        public void consume(final List data) {
-            if (data.size() >= maxBulkSize) {
-                // The data#size actually wouldn't over the maxBulkSize due to 
the DataCarrier channel's max size.
-                // This is just to preventing unexpected case and avoid 
confusion about dropping into else section.
-                bulkWriteProcessor.flush(data);
-                lastFlushTimestamp = System.currentTimeMillis();
-            } else {
-                data.forEach(element -> {
-                    cachedData.add(element);
-                    if (cachedData.size() >= maxBulkSize) {
-                        // Flush and re-init.
-                        bulkWriteProcessor.flush(cachedData);
-                        cachedData = new ArrayList(maxBulkSize);
-                        lastFlushTimestamp = System.currentTimeMillis();
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void onError(final List data, final Throwable t) {
-
-        }
-
-        @Override
-        public void onExit() {
-
-        }
-
-        @Override
-        public void nothingToConsume() {
-            if (System.currentTimeMillis() - lastFlushTimestamp > 
flushInterval) {
-                bulkWriteProcessor.flush(cachedData);
-                cachedData = new ArrayList(maxBulkSize);
-                lastFlushTimestamp = System.currentTimeMillis();
-            }
-        }
-    }
-
-    /**
-     * @param data to be flush.
-     */
-    protected abstract void flush(List data);
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
index 5d3ce23..2ac8a1d 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
@@ -48,14 +48,16 @@ public class MeasureBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyan
      * @param maxBulkSize   the max bulk size for the flush operation
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second.
+     * @param timeout       network timeout threshold in seconds.
      * @param concurrency   the number of concurrency would run for the flush 
max.
      */
     protected MeasureBulkWriteProcessor(
             final BanyanDBClient client,
             final int maxBulkSize,
             final int flushInterval,
-            final int concurrency) {
-        super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency);
+            final int concurrency,
+            final int timeout) {
+        super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
         this.client = client;
     }
 
@@ -77,7 +79,7 @@ public class MeasureBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyan
                                 client.findMeasure(metadata.getGroup(), 
metadata.getName());
                                 schemaExpired.add(schemaKey);
                             } catch (BanyanDBException e) {
-                                throw new RuntimeException(e);
+                                log.error(e.getMessage(), e);
                             }
                         }
                         break;
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
index 2dd6a33..e7e1a3f 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
@@ -49,14 +49,16 @@ public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyand
      * @param maxBulkSize   the max bulk size for the flush operation
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second.
+     * @param timeout       network timeout threshold in seconds.
      * @param concurrency   the number of concurrency would run for the flush 
max.
      */
     protected StreamBulkWriteProcessor(
             final BanyanDBClient client,
             final int maxBulkSize,
             final int flushInterval,
-            final int concurrency) {
-        super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency);
+            final int concurrency,
+            final int timeout) {
+        super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
         this.client = client;
     }
 
@@ -78,7 +80,7 @@ public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyand
                                         client.findStream(metadata.getGroup(), 
metadata.getName());
                                         schemaExpired.add(schemaKey);
                                     } catch (BanyanDBException e) {
-                                        throw new RuntimeException(e);
+                                        log.error(e.getMessage(), e);
                                     }
                                 }
                                 break;
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
index 7aa46b0..331d1db 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
@@ -57,7 +57,7 @@ public class BanyanDBClientMeasureWriteTest extends 
AbstractBanyanDBClientTest {
         measureRegistry = new HashMap<>();
         setUp(bindMeasureRegistry());
 
-        measureBulkWriteProcessor = client.buildMeasureWriteProcessor(1000, 1, 
1);
+        measureBulkWriteProcessor = client.buildMeasureWriteProcessor(1000, 1, 
1, 10);
 
         measure = Measure.create("sw_metric", "service_cpm_minute", 
Duration.ofHours(1))
                 .setEntityRelativeTags("entity_id")
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
index 0112730..8764911 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
@@ -77,7 +77,7 @@ public class BanyanDBClientStreamWriteTest extends 
AbstractBanyanDBClientTest {
     @Before
     public void setUp() throws IOException, BanyanDBException {
         setUp(bindService(groupRegistryServiceImpl), bindStreamRegistry());
-        streamBulkWriteProcessor = client.buildStreamWriteProcessor(1000, 1, 
1);
+        streamBulkWriteProcessor = client.buildStreamWriteProcessor(1000, 1, 
1, 10);
 
         stream = Stream.create("default", "sw")
                 .setEntityRelativeTags("service_id", "service_instance_id", 
"state")
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index deb61f3..785c59a 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -54,7 +54,7 @@ public class ITBanyanDBMeasureQueryTests extends 
BanyanDBClientTestCI {
         Measure expectedMeasure = Measure.create("sw_metric", 
"service_cpm_minute", 
Duration.ofMinutes(1)).setEntityRelativeTags("entity_id").addTagFamily(TagFamilySpec.create("default").addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id")).build()).addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build()).addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build()).addIndex(IndexRule.create("scope",
 IndexRule [...]
         client.define(expectedMeasure);
         Assert.assertNotNull(expectedMeasure);
-        processor = client.buildMeasureWriteProcessor(1000, 1, 1);
+        processor = client.buildMeasureWriteProcessor(1000, 1, 1, 10);
     }
 
     @After
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index 550c29f..cd4097d 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -77,7 +77,7 @@ public class ITBanyanDBStreamQueryTests extends 
BanyanDBClientTestCI {
                 .build();
         this.client.define(expectedStream);
         Assert.assertNotNull(expectedStream);
-        processor = client.buildStreamWriteProcessor(1000, 1, 1);
+        processor = client.buildStreamWriteProcessor(1000, 1, 1, 10);
     }
 
     @After

Reply via email to