This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch polish in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit 7d6adecda901bca8f213b9047458ef20bd1dfe52 Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Jul 1 16:13:45 2024 +0800 Enhance the BulkWriteProcessor 1. Remove DataCarrier. 2. Make the bulk process in sync mode. Once the bulk size is reached, flush(gRPC) process starts immediately. 3. Add a timer as additional processor when the data slightly overs the bulk size. 4. Expose the gRPC timeout for bulk processor creation. Fix previous misused flushInterval as timeout. 5. Correct concurrency implementation. This OAP concept means, the max accepted sources to flush data concurrently. But the threads are controlled by the OAP persistent timer thread pool. --- .../banyandb/commons/datacarrier/DataCarrier.java | 166 --------------------- .../banyandb/commons/datacarrier/EnvUtil.java | 50 ------- .../buffer/ArrayBlockingQueueBuffer.java | 67 --------- .../commons/datacarrier/buffer/Buffer.java | 76 ---------- .../commons/datacarrier/buffer/BufferStrategy.java | 23 --- .../commons/datacarrier/buffer/Channels.java | 93 ------------ .../commons/datacarrier/buffer/QueueBuffer.java | 46 ------ .../datacarrier/common/AtomicRangeInteger.java | 76 ---------- .../datacarrier/consumer/BulkConsumePool.java | 118 --------------- .../datacarrier/consumer/ConsumeDriver.java | 137 ----------------- .../consumer/ConsumerCannotBeCreatedException.java | 25 ---- .../commons/datacarrier/consumer/ConsumerPool.java | 30 ---- .../datacarrier/consumer/ConsumerPoolFactory.java | 50 ------- .../datacarrier/consumer/ConsumerThread.java | 105 ------------- .../commons/datacarrier/consumer/IConsumer.java | 40 ----- .../commons/datacarrier/consumer/IDriver.java | 32 ---- .../consumer/MultipleChannelsConsumer.java | 124 --------------- .../datacarrier/partition/IDataPartitioner.java | 32 ---- .../partition/ProducerThreadPartitioner.java | 37 ----- .../partition/SimpleRollingPartitioner.java | 37 ----- .../v1/client/AbstractBulkWriteProcessor.java | 137 +++++++++++++---- .../banyandb/v1/client/BanyanDBClient.java | 10 +- .../banyandb/v1/client/BulkWriteProcessor.java | 129 ---------------- .../v1/client/MeasureBulkWriteProcessor.java | 8 +- .../v1/client/StreamBulkWriteProcessor.java | 8 +- .../v1/client/BanyanDBClientMeasureWriteTest.java | 2 +- .../v1/client/BanyanDBClientStreamWriteTest.java | 2 +- .../v1/client/ITBanyanDBMeasureQueryTests.java | 2 +- .../v1/client/ITBanyanDBStreamQueryTests.java | 2 +- 29 files changed, 127 insertions(+), 1537 deletions(-) diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java deleted file mode 100644 index db2ed2d..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier; - -import java.util.Properties; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; -import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumeDriver; -import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumerPool; -import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer; -import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IDriver; -import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner; -import org.apache.skywalking.banyandb.commons.datacarrier.partition.SimpleRollingPartitioner; - -/** - * DataCarrier main class. use this instance to set Producer/Consumer Model. - */ -public class DataCarrier<T> { - private Channels<T> channels; - private IDriver driver; - private String name; - - public DataCarrier(int channelSize, int bufferSize) { - this("DEFAULT", channelSize, bufferSize); - } - - public DataCarrier(String name, int channelSize, int bufferSize) { - this(name, name, channelSize, bufferSize); - } - - public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) { - this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING); - } - - public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) { - this.name = name; - bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize); - channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize); - channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy); - } - - public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) { - this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy); - } - - /** - * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link - * SimpleRollingPartitioner} - * - * @param dataPartitioner to partition data into different channel by some rules. - * @return DataCarrier instance for chain - */ - public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) { - this.channels.setPartitioner(dataPartitioner); - return this; - } - - /** - * produce data to buffer, using the given {@link BufferStrategy}. - * - * @return false means produce data failure. The data will not be consumed. - */ - public boolean produce(T data) { - if (driver != null) { - if (!driver.isRunning(channels)) { - return false; - } - } - - return this.channels.save(data); - } - - /** - * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work. - * - * @param consumerClass class of consumer - * @param num number of consumer threads - * @param properties for initializing consumer. - */ - public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, - int num, - long consumeCycle, - Properties properties) { - if (driver != null) { - driver.close(channels); - } - driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties); - driver.begin(channels); - return this; - } - - /** - * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20 - * millis consume cycle. - * - * @param consumerClass class of consumer - * @param num number of consumer threads - */ - public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) { - return this.consume(consumerClass, num, 20, new Properties()); - } - - /** - * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work. - * - * @param consumer single instance of consumer, all consumer threads will all use this instance. - * @param num number of consumer threads - */ - public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) { - if (driver != null) { - driver.close(channels); - } - driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle); - driver.begin(channels); - return this; - } - - /** - * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20 - * millis consume cycle. - * - * @param consumer single instance of consumer, all consumer threads will all use this instance. - * @param num number of consumer threads - */ - public DataCarrier consume(IConsumer<T> consumer, int num) { - return this.consume(consumer, num, 20); - } - - /** - * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming - * model to adjust the consumer thread and throughput. - */ - public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) { - driver = consumerPool; - consumerPool.add(this.name, channels, consumer); - driver.begin(channels); - return this; - } - - /** - * shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link - * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing. - * Better way to change consumeDriver are use {@link DataCarrier#consume} - */ - public void shutdownConsumers() { - if (driver != null) { - driver.close(channels); - } - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java deleted file mode 100644 index 36f0cb6..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier; - -/** - * Read value from system env. - */ -public class EnvUtil { - public static int getInt(String envName, int defaultValue) { - int value = defaultValue; - String envValue = System.getenv(envName); - if (envValue != null) { - try { - value = Integer.parseInt(envValue); - } catch (NumberFormatException e) { - - } - } - return value; - } - - public static long getLong(String envName, long defaultValue) { - long value = defaultValue; - String envValue = System.getenv(envName); - if (envValue != null) { - try { - value = Long.parseLong(envValue); - } catch (NumberFormatException e) { - - } - } - return value; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java deleted file mode 100644 index feb2a99..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.buffer; - -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; - -/** - * The buffer implementation based on JDK ArrayBlockingQueue. - * <p> - * This implementation has better performance in server side. We are still trying to research whether this is suitable - * for agent side, which is more sensitive about blocks. - */ -public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> { - private BufferStrategy strategy; - private ArrayBlockingQueue<T> queue; - private int bufferSize; - - ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) { - this.strategy = strategy; - this.queue = new ArrayBlockingQueue<T>(bufferSize); - this.bufferSize = bufferSize; - } - - @Override - public boolean save(T data) { - //only BufferStrategy.BLOCKING - try { - queue.put(data); - } catch (InterruptedException e) { - // Ignore the error - return false; - } - return true; - } - - @Override - public void setStrategy(BufferStrategy strategy) { - this.strategy = strategy; - } - - @Override - public void obtain(List<T> consumeList) { - queue.drainTo(consumeList); - } - - @Override - public int getBufferSize() { - return bufferSize; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java deleted file mode 100644 index 38e92af..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.buffer; - -import java.util.List; -import org.apache.skywalking.banyandb.commons.datacarrier.common.AtomicRangeInteger; - -/** - * Self implementation ring queue. - */ -public class Buffer<T> implements QueueBuffer<T> { - private final Object[] buffer; - private BufferStrategy strategy; - private AtomicRangeInteger index; - - Buffer(int bufferSize, BufferStrategy strategy) { - buffer = new Object[bufferSize]; - this.strategy = strategy; - index = new AtomicRangeInteger(0, bufferSize); - } - - @Override - public void setStrategy(BufferStrategy strategy) { - this.strategy = strategy; - } - - @Override - public boolean save(T data) { - int i = index.getAndIncrement(); - if (buffer[i] != null) { - switch (strategy) { - case IF_POSSIBLE: - return false; - default: - } - } - buffer[i] = data; - return true; - } - - @Override - public int getBufferSize() { - return buffer.length; - } - - @Override - public void obtain(List<T> consumeList) { - this.obtain(consumeList, 0, buffer.length); - } - - void obtain(List<T> consumeList, int start, int end) { - for (int i = start; i < end; i++) { - if (buffer[i] != null) { - consumeList.add((T) buffer[i]); - buffer[i] = null; - } - } - } - -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java deleted file mode 100644 index 9dbc556..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.buffer; - -public enum BufferStrategy { - BLOCKING, IF_POSSIBLE -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java deleted file mode 100644 index 46ce98f..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.buffer; - -import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner; - -/** - * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when - * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25. - */ -public class Channels<T> { - private final QueueBuffer<T>[] bufferChannels; - private IDataPartitioner<T> dataPartitioner; - private final BufferStrategy strategy; - private final long size; - - public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) { - this.dataPartitioner = partitioner; - this.strategy = strategy; - bufferChannels = new QueueBuffer[channelSize]; - for (int i = 0; i < channelSize; i++) { - if (BufferStrategy.BLOCKING.equals(strategy)) { - bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy); - } else { - bufferChannels[i] = new Buffer<>(bufferSize, strategy); - } - } - // noinspection PointlessArithmeticExpression - size = 1L * channelSize * bufferSize; // it's not pointless, it prevents numeric overflow before assigning an integer to a long - } - - public boolean save(T data) { - int index = dataPartitioner.partition(bufferChannels.length, data); - int retryCountDown = 1; - if (BufferStrategy.IF_POSSIBLE.equals(strategy)) { - int maxRetryCount = dataPartitioner.maxRetryCount(); - if (maxRetryCount > 1) { - retryCountDown = maxRetryCount; - } - } - for (; retryCountDown > 0; retryCountDown--) { - if (bufferChannels[index].save(data)) { - return true; - } - } - return false; - } - - public void setPartitioner(IDataPartitioner<T> dataPartitioner) { - this.dataPartitioner = dataPartitioner; - } - - /** - * override the strategy at runtime. Notice, this will override several channels one by one. So, when running - * setStrategy, each channel may use different BufferStrategy - */ - public void setStrategy(BufferStrategy strategy) { - for (QueueBuffer<T> buffer : bufferChannels) { - buffer.setStrategy(strategy); - } - } - - /** - * get channelSize - */ - public int getChannelSize() { - return this.bufferChannels.length; - } - - public long size() { - return size; - } - - public QueueBuffer<T> getBuffer(int index) { - return this.bufferChannels[index]; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java deleted file mode 100644 index 4baf83c..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.buffer; - -import java.util.List; - -/** - * Queue buffer interface. - */ -public interface QueueBuffer<T> { - /** - * Save data into the queue; - * - * @param data to add. - * @return true if saved - */ - boolean save(T data); - - /** - * Set different strategy when queue is full. - */ - void setStrategy(BufferStrategy strategy); - - /** - * Obtain the existing data from the queue - */ - void obtain(List<T> consumeList); - - int getBufferSize(); -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java deleted file mode 100644 index cb2b4be..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.common; - -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicIntegerArray; - -public class AtomicRangeInteger extends Number implements Serializable { - private static final long serialVersionUID = -4099792402691141643L; - private AtomicIntegerArray values; - - private static final int VALUE_OFFSET = 15; - - private int startValue; - private int endValue; - - public AtomicRangeInteger(int startValue, int maxValue) { - this.values = new AtomicIntegerArray(31); - this.values.set(VALUE_OFFSET, startValue); - this.startValue = startValue; - this.endValue = maxValue - 1; - } - - public final int getAndIncrement() { - int next; - do { - next = this.values.incrementAndGet(VALUE_OFFSET); - if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) { - return endValue; - } - } - while (next > endValue); - - return next - 1; - } - - public final int get() { - return this.values.get(VALUE_OFFSET); - } - - @Override - public int intValue() { - return this.values.get(VALUE_OFFSET); - } - - @Override - public long longValue() { - return this.values.get(VALUE_OFFSET); - } - - @Override - public float floatValue() { - return this.values.get(VALUE_OFFSET); - } - - @Override - public double doubleValue() { - return this.values.get(VALUE_OFFSET); - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java deleted file mode 100644 index fd45499..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import org.apache.skywalking.banyandb.commons.datacarrier.EnvUtil; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; - -/** - * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link - * MultipleChannelsConsumer}s. - * <p> - * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels. - */ -public class BulkConsumePool implements ConsumerPool { - private List<MultipleChannelsConsumer> allConsumers; - private volatile boolean isStarted = false; - - public BulkConsumePool(String name, int size, long consumeCycle) { - size = EnvUtil.getInt(name + "_THREAD", size); - allConsumers = new ArrayList<MultipleChannelsConsumer>(size); - for (int i = 0; i < size; i++) { - MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle); - multipleChannelsConsumer.setDaemon(true); - allConsumers.add(multipleChannelsConsumer); - } - } - - @Override - synchronized public void add(String name, Channels channels, IConsumer consumer) { - MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload(); - multipleChannelsConsumer.addNewTarget(channels, consumer); - } - - /** - * Get the lowest payload consumer thread based on current allocate status. - * - * @return the lowest consumer. - */ - private MultipleChannelsConsumer getLowestPayload() { - MultipleChannelsConsumer winner = allConsumers.get(0); - for (int i = 1; i < allConsumers.size(); i++) { - MultipleChannelsConsumer option = allConsumers.get(i); - if (option.size() < winner.size()) { - winner = option; - } - } - return winner; - } - - /** - * - */ - @Override - public boolean isRunning(Channels channels) { - return isStarted; - } - - @Override - public void close(Channels channels) { - for (MultipleChannelsConsumer consumer : allConsumers) { - consumer.shutdown(); - } - } - - @Override - public void begin(Channels channels) { - if (isStarted) { - return; - } - for (MultipleChannelsConsumer consumer : allConsumers) { - consumer.start(); - } - isStarted = true; - } - - /** - * The creator for {@link BulkConsumePool}. - */ - public static class Creator implements Callable<ConsumerPool> { - private String name; - private int size; - private long consumeCycle; - - public Creator(String name, int poolSize, long consumeCycle) { - this.name = name; - this.size = poolSize; - this.consumeCycle = consumeCycle; - } - - @Override - public ConsumerPool call() { - return new BulkConsumePool(name, size, consumeCycle); - } - - public static int recommendMaxSize() { - return Runtime.getRuntime().availableProcessors() * 2; - } - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java deleted file mode 100644 index 4535a95..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.lang.reflect.InvocationTargetException; -import java.util.Properties; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; - -/** - * Pool of consumers <p> Created by wusheng on 2016/10/25. - */ -public class ConsumeDriver<T> implements IDriver { - private boolean running; - private ConsumerThread[] consumerThreads; - private Channels<T> channels; - private ReentrantLock lock; - - public ConsumeDriver(String name, - Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, - int num, - long consumeCycle, - Properties properties) { - this(channels, num); - for (int i = 0; i < num; i++) { - consumerThreads[i] = new ConsumerThread( - "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties), - consumeCycle - ); - consumerThreads[i].setDaemon(true); - } - } - - public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) { - this(channels, num); - prototype.init(new Properties()); - for (int i = 0; i < num; i++) { - consumerThreads[i] = new ConsumerThread( - "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle); - consumerThreads[i].setDaemon(true); - } - - } - - private ConsumeDriver(Channels<T> channels, int num) { - running = false; - this.channels = channels; - consumerThreads = new ConsumerThread[num]; - lock = new ReentrantLock(); - } - - private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) { - try { - IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance(); - inst.init(properties); - return inst; - } catch (InstantiationException e) { - throw new ConsumerCannotBeCreatedException(e); - } catch (IllegalAccessException e) { - throw new ConsumerCannotBeCreatedException(e); - } catch (NoSuchMethodException e) { - throw new ConsumerCannotBeCreatedException(e); - } catch (InvocationTargetException e) { - throw new ConsumerCannotBeCreatedException(e); - } - } - - @Override - public void begin(Channels channels) { - if (running) { - return; - } - lock.lock(); - try { - this.allocateBuffer2Thread(); - for (ConsumerThread consumerThread : consumerThreads) { - consumerThread.start(); - } - running = true; - } finally { - lock.unlock(); - } - } - - @Override - public boolean isRunning(Channels channels) { - return running; - } - - private void allocateBuffer2Thread() { - int channelSize = this.channels.getChannelSize(); - /** - * if consumerThreads.length < channelSize - * each consumer will process several channels. - * - * if consumerThreads.length == channelSize - * each consumer will process one channel. - * - * if consumerThreads.length > channelSize - * there will be some threads do nothing. - */ - for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) { - int consumerIndex = channelIndex % consumerThreads.length; - consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex)); - } - - } - - @Override - public void close(Channels channels) { - lock.lock(); - try { - this.running = false; - for (ConsumerThread consumerThread : consumerThreads) { - consumerThread.shutdown(); - } - } finally { - lock.unlock(); - } - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java deleted file mode 100644 index 0d0c8de..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -public class ConsumerCannotBeCreatedException extends RuntimeException { - ConsumerCannotBeCreatedException(Throwable t) { - super(t); - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java deleted file mode 100644 index fd93dfb..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; - -/** - * The Consumer pool could support data consumer from multiple {@link DataCarrier}s, by using different consume thread - * management models. - */ -public interface ConsumerPool extends IDriver { - void add(String name, Channels channels, IConsumer consumer); -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java deleted file mode 100644 index 8eb1581..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; - -/** - * Consumer Pool Factory provides global management for all Consumer Pool. - */ -public enum ConsumerPoolFactory { - INSTANCE; - - private final Map<String, ConsumerPool> pools; - - ConsumerPoolFactory() { - pools = new HashMap<>(); - } - - public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception { - if (pools.containsKey(poolName)) { - return false; - } else { - pools.put(poolName, creator.call()); - return true; - } - } - - public ConsumerPool get(String poolName) { - return pools.get(poolName); - } - -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java deleted file mode 100644 index b5f5478..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.util.ArrayList; -import java.util.List; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Buffer; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer; - -public class ConsumerThread<T> extends Thread { - private volatile boolean running; - private IConsumer<T> consumer; - private List<DataSource> dataSources; - private long consumeCycle; - - ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) { - super(threadName); - this.consumer = consumer; - running = false; - dataSources = new ArrayList<DataSource>(1); - this.consumeCycle = consumeCycle; - } - - /** - * add whole buffer to consume - */ - void addDataSource(QueueBuffer<T> sourceBuffer) { - this.dataSources.add(new DataSource(sourceBuffer)); - } - - @Override - public void run() { - running = true; - - final List<T> consumeList = new ArrayList<T>(1500); - while (running) { - if (!consume(consumeList)) { - try { - Thread.sleep(consumeCycle); - } catch (InterruptedException e) { - } - } - } - - // consumer thread is going to stop - // consume the last time - consume(consumeList); - - consumer.onExit(); - } - - private boolean consume(List<T> consumeList) { - for (DataSource dataSource : dataSources) { - dataSource.obtain(consumeList); - } - - if (!consumeList.isEmpty()) { - try { - consumer.consume(consumeList); - } catch (Throwable t) { - consumer.onError(consumeList, t); - } finally { - consumeList.clear(); - } - return true; - } - consumer.nothingToConsume(); - return false; - } - - void shutdown() { - running = false; - } - - /** - * DataSource is a refer to {@link Buffer}. - */ - class DataSource { - private QueueBuffer<T> sourceBuffer; - - DataSource(QueueBuffer<T> sourceBuffer) { - this.sourceBuffer = sourceBuffer; - } - - void obtain(List<T> consumeList) { - sourceBuffer.obtain(consumeList); - } - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java deleted file mode 100644 index 9d965e1..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.util.List; -import java.util.Properties; - -public interface IConsumer<T> { - void init(final Properties properties); - - void consume(List<T> data); - - void onError(List<T> data, Throwable t); - - void onExit(); - - /** - * Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger - * reaction if the queue has no element. - */ - default void nothingToConsume() { - return; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java deleted file mode 100644 index 7011c4e..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; - -/** - * The driver of consumer. - */ -public interface IDriver { - boolean isRunning(Channels channels); - - void close(Channels channels); - - void begin(Channels channels); -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java deleted file mode 100644 index 19969fa..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.consumer; - -import java.util.ArrayList; -import java.util.List; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels; -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer; - -/** - * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link - * IConsumer}s - */ -public class MultipleChannelsConsumer extends Thread { - private volatile boolean running; - private volatile ArrayList<Group> consumeTargets; - @SuppressWarnings("NonAtomicVolatileUpdate") - private volatile long size; - private final long consumeCycle; - - public MultipleChannelsConsumer(String threadName, long consumeCycle) { - super(threadName); - this.consumeTargets = new ArrayList<Group>(); - this.consumeCycle = consumeCycle; - } - - @Override - public void run() { - running = true; - - final List consumeList = new ArrayList(2000); - while (running) { - boolean hasData = false; - for (Group target : consumeTargets) { - boolean consume = consume(target, consumeList); - hasData = hasData || consume; - } - - if (!hasData) { - try { - Thread.sleep(consumeCycle); - } catch (InterruptedException e) { - } - } - } - - // consumer thread is going to stop - // consume the last time - for (Group target : consumeTargets) { - consume(target, consumeList); - - target.consumer.onExit(); - } - } - - private boolean consume(Group target, List consumeList) { - for (int i = 0; i < target.channels.getChannelSize(); i++) { - QueueBuffer buffer = target.channels.getBuffer(i); - buffer.obtain(consumeList); - } - - if (!consumeList.isEmpty()) { - try { - target.consumer.consume(consumeList); - } catch (Throwable t) { - target.consumer.onError(consumeList, t); - } finally { - consumeList.clear(); - } - return true; - } - target.consumer.nothingToConsume(); - return false; - } - - /** - * Add a new target channels. - */ - public void addNewTarget(Channels channels, IConsumer consumer) { - Group group = new Group(channels, consumer); - // Recreate the new list to avoid change list while the list is used in consuming. - ArrayList<Group> newList = new ArrayList<Group>(); - for (Group target : consumeTargets) { - newList.add(target); - } - newList.add(group); - consumeTargets = newList; - size += channels.size(); - } - - public long size() { - return size; - } - - void shutdown() { - running = false; - } - - private static class Group { - private Channels channels; - private IConsumer consumer; - - public Group(Channels channels, IConsumer consumer) { - this.channels = channels; - this.consumer = consumer; - } - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java deleted file mode 100644 index 4b6c576..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.partition; - -import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy; - -public interface IDataPartitioner<T> { - int partition(int total, T data); - - /** - * @return an integer represents how many times should retry when {@link BufferStrategy#IF_POSSIBLE}. - * <p> - * Less or equal 1, means not support retry. - */ - int maxRetryCount(); -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java deleted file mode 100644 index b83fd1f..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.partition; - -/** - * use threadid % total to partition - */ -public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> { - public ProducerThreadPartitioner() { - } - - @Override - public int partition(int total, T data) { - return (int) Thread.currentThread().getId() % total; - } - - @Override - public int maxRetryCount() { - return 1; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java deleted file mode 100644 index e22f92f..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.commons.datacarrier.partition; - -/** - * use normal int to rolling. - */ -public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> { - @SuppressWarnings("NonAtomicVolatileUpdate") - private volatile int i = 0; - - @Override - public int partition(int total, T data) { - return Math.abs(i++ % total); - } - - @Override - public int maxRetryCount() { - return 3; - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java index 5ccb885..9d756d8 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java @@ -21,19 +21,30 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.auto.value.AutoValue; import io.grpc.stub.AbstractAsyncStub; import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; - +import java.io.Closeable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; @Slf4j public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf.GeneratedMessageV3, - STUB extends AbstractAsyncStub<STUB>> extends BulkWriteProcessor { + STUB extends AbstractAsyncStub<STUB>> + implements Runnable, Closeable { private final STUB stub; + private final int maxBulkSize; + private final int flushInterval; + private final ArrayBlockingQueue<Holder> requests; + private final Semaphore semaphore; + private final long flushInternalInMillis; + private final ScheduledThreadPoolExecutor scheduler; + private final int timeout; + private volatile long lastFlushTS = 0; /** * Create the processor. @@ -44,10 +55,32 @@ public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second. * @param concurrency the number of concurrency would run for the flush max. + * @param timeout network timeout threshold in seconds. */ - protected AbstractBulkWriteProcessor(STUB stub, String processorName, int maxBulkSize, int flushInterval, int concurrency) { - super(processorName, maxBulkSize, flushInterval, concurrency); + protected AbstractBulkWriteProcessor(STUB stub, + String processorName, + int maxBulkSize, + int flushInterval, + int concurrency, + int timeout) { this.stub = stub; + this.maxBulkSize = maxBulkSize; + this.flushInterval = flushInterval; + this.timeout = timeout; + requests = new ArrayBlockingQueue<>(maxBulkSize + 1); + this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1); + + scheduler = new ScheduledThreadPoolExecutor(1, r -> { + final Thread thread = new Thread(r); + thread.setName("ElasticSearch BulkProcessor"); + return thread; + }); + scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + scheduler.setRemoveOnCancelPolicy(true); + flushInternalInMillis = flushInterval * 1000; + scheduler.scheduleWithFixedDelay( + this, 0, flushInterval, TimeUnit.SECONDS); } /** @@ -55,22 +88,69 @@ public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf * * @param writeEntity to add. */ + @SneakyThrows public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) { final CompletableFuture<Void> f = new CompletableFuture<>(); - this.buffer.produce(Holder.create(writeEntity, f)); + requests.put(Holder.create(writeEntity, f)); + flushIfNeeded(); return f; } - @Override - protected void flush(List data) { + public void run() { + try { + doPeriodicalFlush(); + } catch (Throwable t) { + log.error("Failed to flush data to BanyanDB", t); + } + } + + @SneakyThrows + protected void flushIfNeeded() { + if (requests.size() >= maxBulkSize) { + flush(); + } + } + + private void doPeriodicalFlush() { + if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 2) { + // Run periodical flush if there is no `flushIfNeeded` executed in the second half of the flush period. + // Otherwise, wait for the next round. By default, the last 2 seconds of the 5s period. + // This could avoid periodical flush running among bulks(controlled by bulkActions). + flush(); + } + } + + public void flush() { + if (requests.isEmpty()) { + return; + } + + try { + semaphore.acquire(); + } catch (InterruptedException e) { + log.error("Interrupted when trying to get semaphore to execute bulk requests", e); + return; + } + + final List<Holder> batch = new ArrayList<>(requests.size()); + requests.drainTo(batch); + final CompletableFuture<Void> future = doFlush(batch); + future.whenComplete((v, t) -> semaphore.release()); + future.join(); + lastFlushTS = System.currentTimeMillis(); + + } + + protected CompletableFuture<Void> doFlush(final List<Holder> data) { + // The batch is used to control the completion of the flush operation. + // There is at most one error per batch, + // because the database server would terminate the batch process when the first error occurs. final CompletableFuture<Void> batch = new CompletableFuture<>(); final StreamObserver<REQ> writeRequestStreamObserver - = this.buildStreamObserver(stub.withDeadlineAfter(flushInterval, TimeUnit.SECONDS), batch); + = this.buildStreamObserver(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS), batch); - List sentData = new ArrayList(data.size()); try { - data.forEach(holder -> { - Holder h = (Holder) holder; + data.forEach(h -> { AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.writeEntity(); REQ request; try { @@ -81,30 +161,21 @@ public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf return; } writeRequestStreamObserver.onNext(request); - sentData.add(h); + h.future().complete(null); }); - } catch (Throwable t) { - log.error("Transform and send request to BanyanDB fail.", t); - batch.completeExceptionally(t); } finally { writeRequestStreamObserver.onCompleted(); } batch.whenComplete((ignored, exp) -> { if (exp != null) { - sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future()) - .forEach((Consumer<CompletableFuture<Void>>) it -> it.completeExceptionally(exp)); log.error("Failed to execute requests in bulk", exp); - } else { - log.debug("Succeeded to execute {} requests in bulk", data.size()); - sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future()) - .forEach((Consumer<CompletableFuture<Void>>) it -> it.complete(null)); } }); - try { - batch.get(30, TimeUnit.SECONDS); - } catch (Throwable t) { - log.error("Waiting responses from BanyanDB fail.", t); - } + return batch; + } + + public void close() { + scheduler.shutdownNow(); } protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, CompletableFuture<Void> batch); @@ -115,7 +186,13 @@ public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf abstract CompletableFuture<Void> future(); - public static <REQ extends com.google.protobuf.GeneratedMessageV3> Holder create(AbstractWrite<REQ> writeEntity, CompletableFuture<Void> future) { + public static <REQ extends com.google.protobuf.GeneratedMessageV3> Holder create(AbstractWrite<REQ> writeEntity, + CompletableFuture<Void> future) { + future.whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to execute the request: {}", writeEntity.toString(), t); + } + }); return new AutoValue_AbstractBulkWriteProcessor_Holder(writeEntity, future); } diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index 2ec4d65..96ad78d 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -279,12 +279,13 @@ public class BanyanDBClient implements Closeable { * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second * @param concurrency the number of concurrency would run for the flush max + * @param timeout network timeout threshold in seconds. * @return stream bulk write processor */ - public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) { + public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.streamServiceStub != null, "stream service is null"); - return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency); + return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout); } /** @@ -294,12 +295,13 @@ public class BanyanDBClient implements Closeable { * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second * @param concurrency the number of concurrency would run for the flush max + * @param timeout network timeout threshold in seconds. * @return stream bulk write processor */ - public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) { + public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.measureServiceStub != null, "measure service is null"); - return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency); + return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout); } /** diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java deleted file mode 100644 index 1bbc1eb..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.v1.client; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier; -import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer; - -/** - * BulkWriteProcessor is a timeline and size dual driven processor. - * <p> - * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and - * size, it could activate flush to continue the process to the next step. - */ -public abstract class BulkWriteProcessor implements Closeable { - protected final int flushInterval; - protected DataCarrier buffer; - - /** - * Create the processor. - * - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - */ - protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) { - this.flushInterval = flushInterval; - this.buffer = new DataCarrier(processorName, concurrency, maxBulkSize); - Properties properties = new Properties(); - properties.put("maxBulkSize", maxBulkSize); - properties.put("flushInterval", flushInterval); - properties.put("BulkWriteProcessor", this); - buffer.consume(QueueWatcher.class, concurrency, 20, properties); - } - - @Override - public void close() throws IOException { - this.buffer.shutdownConsumers(); - } - - /** - * The internal queue consumer for build process. - */ - public static class QueueWatcher implements IConsumer { - private long lastFlushTimestamp; - private int maxBulkSize; - private int flushInterval; - private List cachedData; - private BulkWriteProcessor bulkWriteProcessor; - - public QueueWatcher() { - } - - @Override - public void init(Properties properties) { - lastFlushTimestamp = System.currentTimeMillis(); - maxBulkSize = (Integer) properties.get("maxBulkSize"); - flushInterval = (Integer) properties.get("flushInterval") * 1000; - cachedData = new ArrayList(maxBulkSize); - bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor"); - } - - @Override - public void consume(final List data) { - if (data.size() >= maxBulkSize) { - // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size. - // This is just to preventing unexpected case and avoid confusion about dropping into else section. - bulkWriteProcessor.flush(data); - lastFlushTimestamp = System.currentTimeMillis(); - } else { - data.forEach(element -> { - cachedData.add(element); - if (cachedData.size() >= maxBulkSize) { - // Flush and re-init. - bulkWriteProcessor.flush(cachedData); - cachedData = new ArrayList(maxBulkSize); - lastFlushTimestamp = System.currentTimeMillis(); - } - }); - } - } - - @Override - public void onError(final List data, final Throwable t) { - - } - - @Override - public void onExit() { - - } - - @Override - public void nothingToConsume() { - if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) { - bulkWriteProcessor.flush(cachedData); - cachedData = new ArrayList(maxBulkSize); - lastFlushTimestamp = System.currentTimeMillis(); - } - } - } - - /** - * @param data to be flush. - */ - protected abstract void flush(List data); -} diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java index 5d3ce23..2ac8a1d 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java @@ -48,14 +48,16 @@ public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyan * @param maxBulkSize the max bulk size for the flush operation * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second. + * @param timeout network timeout threshold in seconds. * @param concurrency the number of concurrency would run for the flush max. */ protected MeasureBulkWriteProcessor( final BanyanDBClient client, final int maxBulkSize, final int flushInterval, - final int concurrency) { - super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency); + final int concurrency, + final int timeout) { + super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); this.client = client; } @@ -77,7 +79,7 @@ public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyan client.findMeasure(metadata.getGroup(), metadata.getName()); schemaExpired.add(schemaKey); } catch (BanyanDBException e) { - throw new RuntimeException(e); + log.error(e.getMessage(), e); } } break; diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java index 2dd6a33..e7e1a3f 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java @@ -49,14 +49,16 @@ public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyand * @param maxBulkSize the max bulk size for the flush operation * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger * automatically. Unit is second. + * @param timeout network timeout threshold in seconds. * @param concurrency the number of concurrency would run for the flush max. */ protected StreamBulkWriteProcessor( final BanyanDBClient client, final int maxBulkSize, final int flushInterval, - final int concurrency) { - super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency); + final int concurrency, + final int timeout) { + super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); this.client = client; } @@ -78,7 +80,7 @@ public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyand client.findStream(metadata.getGroup(), metadata.getName()); schemaExpired.add(schemaKey); } catch (BanyanDBException e) { - throw new RuntimeException(e); + log.error(e.getMessage(), e); } } break; diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java index 7aa46b0..331d1db 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java @@ -57,7 +57,7 @@ public class BanyanDBClientMeasureWriteTest extends AbstractBanyanDBClientTest { measureRegistry = new HashMap<>(); setUp(bindMeasureRegistry()); - measureBulkWriteProcessor = client.buildMeasureWriteProcessor(1000, 1, 1); + measureBulkWriteProcessor = client.buildMeasureWriteProcessor(1000, 1, 1, 10); measure = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1)) .setEntityRelativeTags("entity_id") diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java index 0112730..8764911 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java @@ -77,7 +77,7 @@ public class BanyanDBClientStreamWriteTest extends AbstractBanyanDBClientTest { @Before public void setUp() throws IOException, BanyanDBException { setUp(bindService(groupRegistryServiceImpl), bindStreamRegistry()); - streamBulkWriteProcessor = client.buildStreamWriteProcessor(1000, 1, 1); + streamBulkWriteProcessor = client.buildStreamWriteProcessor(1000, 1, 1, 10); stream = Stream.create("default", "sw") .setEntityRelativeTags("service_id", "service_instance_id", "state") diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java index deb61f3..785c59a 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java @@ -54,7 +54,7 @@ public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI { Measure expectedMeasure = Measure.create("sw_metric", "service_cpm_minute", Duration.ofMinutes(1)).setEntityRelativeTags("entity_id").addTagFamily(TagFamilySpec.create("default").addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id")).build()).addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build()).addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build()).addIndex(IndexRule.create("scope", IndexRule [...] client.define(expectedMeasure); Assert.assertNotNull(expectedMeasure); - processor = client.buildMeasureWriteProcessor(1000, 1, 1); + processor = client.buildMeasureWriteProcessor(1000, 1, 1, 10); } @After diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java index 550c29f..cd4097d 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java @@ -77,7 +77,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { .build(); this.client.define(expectedStream); Assert.assertNotNull(expectedStream); - processor = client.buildStreamWriteProcessor(1000, 1, 1); + processor = client.buildStreamWriteProcessor(1000, 1, 1, 10); } @After