This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 87eb05269b2 Support SimpleMemoryPipelineChannel use 0 blocking queue
size (#29349)
87eb05269b2 is described below
commit 87eb05269b2362ef1fbf3e69aae544a15087597f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Dec 10 17:56:51 2023 +0800
Support SimpleMemoryPipelineChannel use 0 blocking queue size (#29349)
* Support SimpleMemoryPipelineChannel use 0 blocking queue size
* Enable fair for incremental
---
.../core/ingest/channel/PipelineChannel.java | 1 +
.../memory/MemoryPipelineChannelCreator.java | 4 ++--
.../memory/SimpleMemoryPipelineChannel.java | 4 ++--
.../memory/SimpleMemoryPipelineChannelTest.java | 25 +++++++++++++++++++---
4 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
index 333bd92267f..0f5548ac15c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
/**
* Pipeline channel.
+ * <p>It supports multiple push threads and one fetch thread.</p>
*/
public interface PipelineChannel extends Closeable {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
index 8a0e1704a49..91d2e78231b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
@@ -41,8 +41,8 @@ public final class MemoryPipelineChannelCreator implements
PipelineChannelCreato
@Override
public PipelineChannel createPipelineChannel(final int outputConcurrency,
final int averageElementSize, final AckCallback ackCallback) {
- return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel((int)
Math.ceil((double) blockQueueSize / averageElementSize), ackCallback)
- : new MultiplexMemoryPipelineChannel(outputConcurrency,
blockQueueSize, ackCallback);
+ return 1 == outputConcurrency ? new
SimpleMemoryPipelineChannel(blockQueueSize / averageElementSize, ackCallback)
+ : new MultiplexMemoryPipelineChannel(outputConcurrency,
blockQueueSize < 1 ? 5 : blockQueueSize, ackCallback);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 2389ef4baa3..4c56ef2b68c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -39,7 +40,7 @@ public final class SimpleMemoryPipelineChannel implements
PipelineChannel {
private final AckCallback ackCallback;
public SimpleMemoryPipelineChannel(final int blockQueueSize, final
AckCallback ackCallback) {
- this.queue = new ArrayBlockingQueue<>(blockQueueSize);
+ this.queue = blockQueueSize < 1 ? new SynchronousQueue<>(true) : new
ArrayBlockingQueue<>(blockQueueSize, true);
this.ackCallback = ackCallback;
}
@@ -50,7 +51,6 @@ public final class SimpleMemoryPipelineChannel implements
PipelineChannel {
}
@SneakyThrows(InterruptedException.class)
- // TODO thread-safe?
@Override
public List<Record> fetchRecords(final int batchSize, final long timeout,
final TimeUnit timeUnit) {
List<Record> result = new LinkedList<>();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
index 57bc8eb56e8..506f3f188d8 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -17,24 +17,43 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
+import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SimpleMemoryPipelineChannelTest {
+ @SneakyThrows(InterruptedException.class)
+ @Test
+ void assertZeroQueueSizeWorks() {
+ SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(0, new EmptyAckCallback());
+ List<Record> records = Collections.singletonList(new
PlaceholderRecord(new FinishedPosition()));
+ Thread thread = new Thread(() -> channel.pushRecords(records));
+ thread.start();
+ assertThat(channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS),
is(records));
+ thread.join();
+ }
+
@Test
void assertFetchRecordsTimeoutCorrectly() {
- SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
+ SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
long startMills = System.currentTimeMillis();
- simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+ channel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
long delta = System.currentTimeMillis() - startMills;
assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " +
delta);
startMills = System.currentTimeMillis();
- simpleMemoryPipelineChannel.fetchRecords(1, 500,
TimeUnit.MILLISECONDS);
+ channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS);
delta = System.currentTimeMillis() - startMills;
assertTrue(delta >= 500 && delta < 650, "Delta is not in [500,650) : "
+ delta);
}