This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 89d96ade469 Fix potential deadlock when freeing memory in MemoryPool
89d96ade469 is described below
commit 89d96ade469da99b6ca15c740aae1a91087a2b54
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri May 26 12:58:16 2023 +0800
Fix potential deadlock when freeing memory in MemoryPool
---
.../src/assembly/resources/conf/iotdb-common.properties | 5 +++--
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 5 +++--
.../mpp/execution/exchange/MPPDataExchangeManager.java | 10 +++++++---
.../db/mpp/execution/exchange/SharedTsBlockQueue.java | 16 +++++++++++++---
.../db/mpp/execution/exchange/LocalSinkChannelTest.java | 14 ++++++++++++--
.../db/mpp/execution/exchange/LocalSourceHandleTest.java | 14 ++++++++++++--
.../mpp/execution/exchange/SharedTsBlockQueueTest.java | 7 ++++++-
7 files changed, 56 insertions(+), 15 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 6e1087984ee..4a22248cbd1 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -431,9 +431,10 @@ cluster_name=defaultCluster
# Datatype: int
# query_thread_count=0
-# How many pipeline drivers will be created for one fragment instance. When <=
0, use CPU core number / 2.
+# How many pipeline drivers will be created for one fragment instance. Default
dop = 1 means FI will not be further split.
+# CPU core number / 2 could be a choice.
# Datatype: int
-# degree_of_query_parallelism=0
+# degree_of_query_parallelism=1
# The threshold of count map size when calculating the MODE aggregation
function
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 838e8e3e232..6654a38e6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -335,7 +335,8 @@ public class IoTDBConfig {
/** How many threads can concurrently execute query statement. When <= 0,
use CPU core number. */
private int queryThreadCount = Runtime.getRuntime().availableProcessors();
- private int degreeOfParallelism = Math.max(1,
Runtime.getRuntime().availableProcessors() / 2);
+ /** default dop = 1 for now */
+ private int degreeOfParallelism = 1;
private int modeMapSizeThreshold = 10000;
@@ -1604,7 +1605,7 @@ public class IoTDBConfig {
}
public void setDegreeOfParallelism(int degreeOfParallelism) {
- this.degreeOfParallelism = degreeOfParallelism;
+ this.degreeOfParallelism = Math.max(1, degreeOfParallelism);
}
public int getDegreeOfParallelism() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index c1273ba2a48..119b8178ae9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -591,7 +591,9 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
queue = localSourceHandle.getSharedTsBlockQueue();
} else {
LOGGER.debug("Create SharedTsBlockQueue");
- queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId,
localMemoryManager);
+ queue =
+ new SharedTsBlockQueue(
+ localFragmentInstanceId, localPlanNodeId, localMemoryManager,
executorService);
}
return new LocalSinkChannel(
@@ -614,7 +616,8 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
new SharedTsBlockQueue(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
- localMemoryManager);
+ localMemoryManager,
+ executorService);
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
@@ -770,7 +773,8 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
} else {
LOGGER.debug("Create SharedTsBlockQueue");
queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId,
localMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager,
executorService);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 6e484262d50..905199f76e6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -38,9 +38,9 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
/** This is not thread safe class, the caller should ensure multi-threads
safety. */
@NotThreadSafe
@@ -81,10 +81,14 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ // used for SharedTsBlockQueue listener
+ private final ExecutorService executorService;
+
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
- LocalMemoryManager localMemoryManager) {
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be
null");
this.fullFragmentInstanceId =
@@ -92,6 +96,7 @@ public class SharedTsBlockQueue {
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be
null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be
null");
+ this.executorService = Validate.notNull(executorService, "ExecutorService
can not be null.");
}
public boolean hasNoMoreTsBlocks() {
@@ -235,7 +240,12 @@ public class SharedTsBlockQueue {
}
}
},
- directExecutor());
+ // Use directExecutor() here could lead to deadlock. Thread A holds
lock of
+ // SharedTsBlockQueueA and tries to invoke the listener of
+ // SharedTsBlockQueueB(when freeing memory to complete
MemoryReservationFuture) while
+ // Thread B holds lock of SharedTsBlockQueueB and tries to invoke
the listener of
+ // SharedTsBlockQueueA
+ executorService);
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d85..694edfc6b6e 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class LocalSinkChannelTest {
@Test
public void testSend() {
@@ -50,7 +52,11 @@ public class LocalSinkChannelTest {
SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
// Construct a shared TsBlock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId,
mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
// Construct Sink.
LocalSinkChannel localSinkChannel =
@@ -137,7 +143,11 @@ public class LocalSinkChannelTest {
SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
// Construct a shared tsblock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId,
mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
// Construct SinkChannel.
LocalSinkChannel localSinkChannel =
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f199676..db57cd0c329 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class LocalSourceHandleTest {
@Test
public void testReceive() {
@@ -47,7 +49,11 @@ public class LocalSourceHandleTest {
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a shared TsBlock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId,
mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
@@ -91,7 +97,11 @@ public class LocalSourceHandleTest {
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a shared tsblock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId,
mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
index e5336d2ac2a..9c3399a6e17 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class SharedTsBlockQueueTest {
@Test(timeout = 5000L)
public void concurrencyTest() {
@@ -46,7 +48,10 @@ public class SharedTsBlockQueueTest {
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
- new TFragmentInstanceId(queryId, 0, "0"), "test",
mockLocalMemoryManager);
+ new TFragmentInstanceId(queryId, 0, "0"),
+ "test",
+ mockLocalMemoryManager,
+ newDirectExecutorService());
queue.getCanAddTsBlock().set(null);
queue.setMaxBytesCanReserve(Long.MAX_VALUE);