This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0ff85cdf899 Pipe: Fix pipe executor stuck by unlimited file event
memory allocation retries & Pipe: Fix IO triggered in disruptor & Pipe: Fix
forceAllocateIfSufficient & Load: Remove RM detect logic during the second
phase (#15085)
0ff85cdf899 is described below
commit 0ff85cdf8993d4b4a4ab89d73866aaf9073196ab
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Mar 14 09:53:11 2025 +0800
Pipe: Fix pipe executor stuck by unlimited file event memory allocation
retries & Pipe: Fix IO triggered in disruptor & Pipe: Fix
forceAllocateIfSufficient & Load: Remove RM detect logic during the second
phase (#15085)
Co-authored-by: VGalaxies <[email protected]>
---
.../IoTDBDefaultPullConsumerDataSetIT.java | 28 ++++++++---
.../async/handler/PipeTransferTsFileHandler.java | 15 +++++-
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../db/pipe/resource/memory/PipeMemoryManager.java | 29 ++++-------
.../pipe/resource/tsfile/PipeTsFileResource.java | 6 +--
.../impl/DataNodeInternalRPCServiceImpl.java | 58 +++-------------------
.../plan/analyze/ClusterPartitionFetcher.java | 5 --
.../plan/analyze/IPartitionFetcher.java | 4 --
.../scheduler/load/LoadTsFileDispatcherImpl.java | 27 +++++-----
.../plan/scheduler/load/LoadTsFileScheduler.java | 24 +--------
.../dataregion/wal/utils/WALEntryHandler.java | 4 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 8 +--
.../plan/analyze/FakePartitionFetcherImpl.java | 5 --
.../plan/planner/distribution/Util.java | 5 --
.../plan/planner/distribution/Util2.java | 5 --
.../src/main/thrift/datanode.thrift | 1 -
16 files changed, 79 insertions(+), 149 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
index 9b2994696cc..0b36deed5eb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
@@ -41,6 +41,8 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionMisc.class})
public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegressionIT {
@@ -117,10 +119,15 @@ public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegre
String sql = "select count(s_0) from " + databasePrefix + "0.d_0";
System.out.println(FORMAT.format(new Date()) + " src: " +
getCount(session_src, sql));
// Consumption data
- consume_data(consumer, session_dest);
- for (int i = 0; i < deviceCount; i++) {
- check_count(10, "select count(s_0) from " + devices.get(i), i +
":Consumption Data:s_0");
- }
+ AWAIT.untilAsserted(
+ () -> {
+ session_src.executeNonQueryStatement("flush");
+ consume_data(consumer, session_dest);
+ for (int i = 0; i < deviceCount; i++) {
+ check_count(
+ 10, "select count(s_0) from " + devices.get(i), i +
":Consumption Data:s_0");
+ }
+ });
// Unsubscribe
consumer.unsubscribe(topicName);
// Unsubscribe and then write data
@@ -133,9 +140,14 @@ public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegre
System.out.println(FORMAT.format(new Date()) + " src: " +
getCount(session_src, sql));
// Consumption data: Progress is not retained when re-subscribing after
cancellation. Full
// synchronization.
- consume_data(consumer, session_dest);
- for (int i = 0; i < deviceCount; i++) {
- check_count(15, "select count(s_0) from " + devices.get(i), i +
":consume data again:s_0");
- }
+ AWAIT.untilAsserted(
+ () -> {
+ session_src.executeNonQueryStatement("flush");
+ consume_data(consumer, session_dest);
+ for (int i = 0; i < deviceCount; i++) {
+ check_count(
+ 15, "select count(s_0) from " + devices.get(i), i + ":consume
data again:s_0");
+ }
+ });
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3423d980079..5de2ef38948 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -113,7 +113,13 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
this.transferMod = transferMod;
currentFile = transferMod ? modFile : tsFile;
- waitForResourceEnough4Slicing(Integer.MAX_VALUE);
+ // NOTE: Waiting for resource enough for slicing here may cause deadlock!
+ // TsFile events are producing and consuming at the same time, and the
memory of a TsFile
+ // event is not released until the TsFile is sealed. So if the memory is
not enough for slicing,
+ // the TsFile event will be blocked and waiting for the memory to be
released. At the same time,
+ // the memory of the TsFile event is not released, so the memory is not
enough for slicing. This
+ // will cause a deadlock.
+ waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000));
// 20 - 40 seconds
readFileBufferSize =
(int)
Math.min(
@@ -395,7 +401,14 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
memoryBlock.close();
}
+ /**
+ * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
+ */
private void waitForResourceEnough4Slicing(final long timeoutMs) throws
InterruptedException {
+ if
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled())
{
+ return;
+ }
+
final PipeMemoryManager memoryManager =
PipeDataNodeResourceManager.memory();
if (memoryManager.isEnough4TsFileSlicing()) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 15378fc6482..4f9823709a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -402,7 +402,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws
PipeException {
- return toTabletInsertionEvents(Long.MAX_VALUE);
+ // 20 - 40 seconds for waiting
+ // Can not be unlimited or will cause deadlock
+ return toTabletInsertionEvents((long) ((1 + Math.random()) * 20 * 1000));
}
public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long
timeoutMs)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6a2f63858f6..c2ac5cb70c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -242,7 +242,7 @@ public class PipeMemoryManager {
}
try {
- tryShrink4Allocate(sizeInBytes);
+ tryShrinkUntilFreeMemorySatisfy(sizeInBytes);
this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -312,7 +312,7 @@ public class PipeMemoryManager {
}
try {
- tryShrink4Allocate(sizeInBytes);
+ tryShrinkUntilFreeMemorySatisfy(sizeInBytes);
this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -332,13 +332,13 @@ public class PipeMemoryManager {
}
/**
- * Allocate a {@link PipeMemoryBlock} for pipe only if memory already used
is less than the
- * specified threshold.
+ * Allocate a {@link PipeMemoryBlock} for pipe only if memory used after
allocation is less than
+ * the specified threshold.
*
* @param sizeInBytes size of memory needed to allocate
* @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0
- * @return {@code null} if the proportion of memory already used exceeds
{@code usedThreshold}.
- * Will return a memory block otherwise.
+ * @return {@code null} if the proportion of memory used after allocation
exceeds {@code
+ * usedThreshold}. Will return a memory block otherwise.
*/
public synchronized PipeMemoryBlock forceAllocateIfSufficient(
long sizeInBytes, float usedThreshold) {
@@ -354,18 +354,11 @@ public class PipeMemoryManager {
return registerMemoryBlock(0);
}
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
- && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES <
usedThreshold) {
+ if ((float) (usedMemorySizeInBytes + sizeInBytes)
+ <= TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold) {
return forceAllocate(sizeInBytes);
- } else {
- long memoryToShrink =
- Math.max(
- usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES *
usedThreshold),
- sizeInBytes);
- if (tryShrink4Allocate(memoryToShrink)) {
- return forceAllocate(sizeInBytes);
- }
}
+
return null;
}
@@ -404,7 +397,7 @@ public class PipeMemoryManager {
MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
}
- if (tryShrink4Allocate(sizeToAllocateInBytes)) {
+ if (tryShrinkUntilFreeMemorySatisfy(sizeToAllocateInBytes)) {
LOGGER.info(
"tryAllocate: allocated memory, "
+ "total memory size {} bytes, used memory size {} bytes, "
@@ -477,7 +470,7 @@ public class PipeMemoryManager {
return returnedMemoryBlock;
}
- private boolean tryShrink4Allocate(long sizeInBytes) {
+ private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
final List<PipeMemoryBlock> shuffledBlocks = new
ArrayList<>(allocatedBlocks);
Collections.shuffle(shuffledBlocks);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 1c2a46e9b59..2cb3a59c399 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResource.class);
public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
- private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+ private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
private final File hardlinkOrCopiedFile;
private final boolean isTsFile;
@@ -207,7 +207,7 @@ public class PipeTsFileResource implements AutoCloseable {
MEMORY_SUFFICIENT_THRESHOLD);
if (allocatedMemoryBlock == null) {
LOGGER.info(
- "PipeTsFileResource: Failed to create TsFileSequenceReader for
tsfile {} in cache, because memory usage is high",
+ "Failed to cacheDeviceIsAlignedMapIfAbsent for tsfile {}, because
memory usage is high",
hardlinkOrCopiedFile.getPath());
return false;
}
@@ -271,7 +271,7 @@ public class PipeTsFileResource implements AutoCloseable {
MEMORY_SUFFICIENT_THRESHOLD);
if (allocatedMemoryBlock == null) {
LOGGER.info(
- "PipeTsFileResource: Failed to create TsFileSequenceReader for
tsfile {} in cache, because memory usage is high",
+ "Failed to cacheObjectsIfAbsent for tsfile {}, because memory usage
is high",
hardlinkOrCopiedFile.getPath());
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5deebe5b3ab..e40e0996e23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.protocol.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TLoadSample;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSender;
import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -273,7 +271,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -460,20 +457,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TLoadResp sendLoadCommand(final TLoadCommandReq req) {
- final List<Integer> regionIds = req.getRegionIds();
- final Map<Integer, TRegionReplicaSet> id2replicaSetBeforeExecution =
- req.isSetRegionIds()
- && req.getCommandType() ==
LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal()
- ? regionIds.stream()
- .collect(
- Collectors.toMap(
- regionId -> regionId,
- regionId ->
- partitionFetcher.getRegionReplicaSet(
- new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))))
- : Collections.emptyMap();
-
+ public TLoadResp sendLoadCommand(TLoadCommandReq req) {
final ProgressIndex progressIndex;
if (req.isSetProgressIndex()) {
progressIndex =
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
@@ -484,39 +468,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
"Use local generated load progress index {} for uuid {}.",
progressIndex, req.uuid);
}
- final TLoadResp resp =
- createTLoadResp(
- StorageEngine.getInstance()
- .executeLoadCommand(
- LoadTsFileScheduler.LoadCommand.values()[req.commandType],
- req.uuid,
- req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
- progressIndex));
-
- if (!id2replicaSetBeforeExecution.isEmpty()) {
- for (Map.Entry<Integer, TRegionReplicaSet> entryBefore :
- id2replicaSetBeforeExecution.entrySet()) {
- final TRegionReplicaSet replicaSetAfterExecution =
- partitionFetcher.getRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion,
entryBefore.getKey()));
- LOGGER.warn(
- "Load request {} for region {} executed with replica set changed
from {} to {}",
- req.uuid,
- entryBefore.getKey(),
- entryBefore.getValue(),
- replicaSetAfterExecution);
- if (!Objects.equals(entryBefore.getValue(), replicaSetAfterExecution))
{
- return createTLoadResp(
- RpcUtils.getStatus(
- TSStatusCode.LOAD_FILE_ERROR,
- String.format(
- "Region %d replica set changed from %s to %s",
- entryBefore.getKey(), entryBefore.getValue(),
replicaSetAfterExecution)));
- }
- }
- }
-
- return resp;
+ return createTLoadResp(
+ StorageEngine.getInstance()
+ .executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+ req.uuid,
+ req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+ progressIndex));
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index d9c5e0dbac6..e76973c516b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -287,11 +287,6 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(),
req.getRegionRouteMap());
}
- @Override
- public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
- return partitionCache.getRegionReplicaSet(id);
- }
-
@Override
public void invalidAllCache() {
partitionCache.invalidAllCache();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
index 227c5df1e4e..ecd88a9c647 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -90,8 +88,6 @@ public interface IPartitionFetcher {
/** Update region cache in partition cache when receive request from config
node */
boolean updateRegionCache(TRegionRouteReq req);
- TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id);
-
/** Invalid all partition cache */
void invalidAllCache();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index bab730c8af2..5227f230007 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -56,10 +56,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -107,7 +105,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
for (FragmentInstance instance : instances) {
try (SetThreadName threadName =
new SetThreadName(
- LoadTsFileScheduler.class.getName() +
instance.getId().getFullId())) {
+ "load-dispatcher" + "-" + instance.getId().getFullId() +
"-" + uuid)) {
dispatchOneInstance(instance);
} catch (FragmentInstanceDispatchException e) {
return new FragInstanceDispatchResult(e.getFailureStatus());
@@ -222,24 +220,25 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
public Future<FragInstanceDispatchResult> dispatchCommand(
TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
- Map<TEndPoint, List<Integer>> endPoint2RegionIdsMap = new HashMap<>();
+ Set<TEndPoint> allEndPoint = new HashSet<>();
for (TRegionReplicaSet replicaSet : replicaSets) {
for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
- endPoint2RegionIdsMap
- .computeIfAbsent(dataNodeLocation.getInternalEndPoint(), o -> new
ArrayList<>())
- .add(replicaSet.getRegionId().getId());
+ allEndPoint.add(dataNodeLocation.getInternalEndPoint());
}
}
- for (final Map.Entry<TEndPoint, List<Integer>> entry :
endPoint2RegionIdsMap.entrySet()) {
- try (final SetThreadName threadName =
+ for (TEndPoint endPoint : allEndPoint) {
+ try (SetThreadName threadName =
new SetThreadName(
- LoadTsFileScheduler.class.getName() + "-" +
loadCommandReq.commandType)) {
- loadCommandReq.setRegionIds(entry.getValue());
- if (isDispatchedToLocal(entry.getKey())) {
+ "load-dispatcher"
+ + "-"
+ +
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
+ + "-"
+ + loadCommandReq.uuid)) {
+ if (isDispatchedToLocal(endPoint)) {
dispatchLocally(loadCommandReq);
} else {
- dispatchRemote(loadCommandReq, entry.getKey());
+ dispatchRemote(loadCommandReq, endPoint);
}
} catch (FragmentInstanceDispatchException e) {
LOGGER.warn("Cannot dispatch LoadCommand for load operation {}",
loadCommandReq, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e7d6a719ef0..04d38f25f89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -407,8 +407,6 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(status);
return false;
}
-
- checkAllReplicaSetsConsistency();
} catch (IOException e) {
LOGGER.warn(
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {},
tsFile: {}",
@@ -425,8 +423,7 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(e);
return false;
} catch (Exception e) {
- LOGGER.warn(
- String.format("Exception occurred during second phase of loading
TsFile %s.", tsFile), e);
+ LOGGER.warn("Exception occurred during second phase of loading TsFile
{}.", tsFile, e);
stateMachine.transitionToFailed(e);
return false;
}
@@ -443,25 +440,6 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
- public void checkAllReplicaSetsConsistency() throws
RegionReplicaSetChangedException {
- for (final TRegionReplicaSet replicaSet : allReplicaSets) {
- final TConsensusGroupId regionId = replicaSet.getRegionId();
- if (regionId == null) {
- LOGGER.info(
- "region id is null during region consistency check, will skip this
region: {}",
- replicaSet);
- continue;
- }
-
- final TRegionReplicaSet currentReplicaSet =
- partitionFetcher.fetcher.getRegionReplicaSet(regionId);
- if (!Objects.equals(replicaSet, currentReplicaSet)) {
- LOGGER.warn("Region replica set changed from {} to {}", replicaSet,
currentReplicaSet);
- throw new RegionReplicaSetChangedException(replicaSet,
currentReplicaSet);
- }
- }
- }
-
private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException
{
LOGGER.info("Start load TsFile {} locally.",
node.getTsFileResource().getTsFile().getPath());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index de2379c380e..333842e38ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -90,9 +90,7 @@ public class WALEntryHandler {
public InsertNode getInsertNodeViaCacheIfPossible() {
try {
final WALEntryValue finalValue = value;
- return finalValue instanceof InsertNode
- ? (InsertNode) finalValue
- :
walEntryPosition.readByteBufferOrInsertNodeViaCacheDirectly().getRight();
+ return finalValue instanceof InsertNode ? (InsertNode) finalValue : null;
} catch (Exception e) {
logger.warn("Fail to get insert node via cache. {}", this, e);
throw e;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 0541ff96188..efba8703072 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -77,9 +77,11 @@ public class WALInsertNodeCache {
final long requestedAllocateSize =
(long)
Math.min(
- (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
- * CONFIG.getWalFileSizeThresholdInByte(),
- CONFIG.getAllocateMemoryForPipe() * 0.45);
+ 1.0
+ * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+ * CONFIG.getWalFileSizeThresholdInByte()
+ / CONFIG.getDataRegionNum(),
+ 0.5 * CONFIG.getAllocateMemoryForPipe() /
CONFIG.getDataRegionNum());
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
index 1afde0bfd1e..e33cc2d322c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
@@ -296,11 +296,6 @@ public class FakePartitionFetcherImpl implements
IPartitionFetcher {
return true;
}
- @Override
- public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
- return null;
- }
-
@Override
public void invalidAllCache() {}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
index 93bfb6c7170..e08bbb074ea 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
@@ -416,11 +416,6 @@ public class Util {
return false;
}
- @Override
- public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
- return null;
- }
-
@Override
public void invalidAllCache() {}
};
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
index fee4910628f..3d5e66aea49 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
@@ -310,11 +310,6 @@ public class Util2 {
return false;
}
- @Override
- public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
- return null;
- }
-
@Override
public void invalidAllCache() {}
};
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index bbc9057bff9..4b25e08d1f6 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -336,7 +336,6 @@ struct TLoadCommandReq {
2: required string uuid
3: optional bool isGeneratedByPipe
4: optional binary progressIndex
- 5: optional list<i32> regionIds
}
struct TLoadResp {