This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 11d827771fe Fixed multiple bugs of insertion (#17570)
11d827771fe is described below
commit 11d827771fe87e408637256e0119839195af55d3
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 12:26:42 2026 +0800
Fixed multiple bugs of insertion (#17570)
* insert-fix
* source/sink
* source/sink-2
* sptls
* fix
* sink
* compile
---
.../pipe/sink/protocol/IoTDBConfigRegionSink.java | 9 +-
.../dataregion/PipeDataRegionPluginAgent.java | 8 +-
.../iotdb/db/pipe/agent/task/PipeDataNodeTask.java | 28 +--
.../agent/task/builder/PipeDataNodeBuilder.java | 8 +-
.../task/execution/PipeSinkSubtaskExecutor.java | 2 +-
.../agent/task/stage/PipeTaskProcessorStage.java | 16 +-
.../pipe/agent/task/stage/PipeTaskSinkStage.java | 13 +-
.../pipe/agent/task/stage/PipeTaskSourceStage.java | 19 +-
.../airgap/IoTDBSchemaRegionAirGapSink.java | 7 +-
.../PipeTransferTabletBatchEventHandler.java | 10 +-
.../PipeTransferTabletInsertNodeEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 7 +-
.../handler/PipeTransferTabletRawEventHandler.java | 6 +-
.../handler/PipeTransferTrackableHandler.java | 24 +-
.../async/handler/PipeTransferTsFileHandler.java | 27 +-
.../realtime/assigner/DisruptorQueue.java | 4 +-
.../listener/PipeInsertionDataNodeListener.java | 36 +--
.../planner/plan/node/write/InsertTabletNode.java | 60 ++++-
.../node/write/RelationalInsertTabletNode.java | 6 +-
.../db/storageengine/dataregion/DataRegion.java | 275 +++++++++++++--------
.../task/stage/SubscriptionTaskSinkStage.java | 11 +-
.../storageengine/dataregion/DataRegionTest.java | 227 +++++++++++++++++
.../iotdb/commons/client/ClientPoolFactory.java | 4 +-
.../iotdb/commons/concurrent/ThreadName.java | 12 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 93 ++++---
.../iotdb/commons/pipe/config/PipeDescriptor.java | 8 +-
26 files changed, 627 insertions(+), 295 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 36bd1cb1a5e..f06a47d4541 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -97,7 +97,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink {
protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
final String fileName, final long position, final byte[] payLoad) {
throw new UnsupportedOperationException(
- "The config region connector does not support transferring single file
piece req.");
+ "The config region sink does not support transferring single file
piece req.");
}
@Override
@@ -114,13 +114,13 @@ public class IoTDBConfigRegionSink extends
IoTDBSslSyncSink {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
+ "IoTDBConfigRegionSink can't transfer TabletInsertionEvent.");
}
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
+ "IoTDBConfigRegionSink can't transfer TsFileInsertionEvent.");
}
@Override
@@ -130,8 +130,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
} else if (event instanceof PipeConfigRegionSnapshotEvent) {
doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn(
- "IoTDBConfigRegionConnector does not support transferring generic
event: {}.", event);
+ LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index 67a7d6549d9..60103e078fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -75,22 +75,22 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
// TODO: validate visibility for schema region and config region
final Visibility pipeVisibility =
VisibilityUtils.calculateFromExtractorParameters(new
PipeParameters(sourceAttributes));
- final Visibility extractorVisibility =
+ final Visibility sourceVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
final Visibility processorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass());
final Visibility connectorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass());
if (!VisibilityUtils.isCompatible(
- pipeVisibility, extractorVisibility, processorVisibility,
connectorVisibility)) {
+ pipeVisibility, sourceVisibility, processorVisibility,
connectorVisibility)) {
throw new PipeParameterNotValidException(
String.format(
- "The visibility of the pipe (%s, %s) is not compatible with the
visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector
(%s, %s, %s).",
+ "The visibility of the pipe (%s, %s) is not compatible with the
visibility of the source (%s, %s, %s), processor (%s, %s, %s), and connector
(%s, %s, %s).",
pipeName,
pipeVisibility,
sourceAttributes,
temporaryExtractor.getClass().getName(),
- extractorVisibility,
+ sourceVisibility,
processorAttributes,
temporaryProcessor.getClass().getName(),
processorVisibility,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
index d33ec44a86e..0d0b955c210 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
@@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask {
private final String pipeName;
private final int regionId;
- private final PipeTaskStage extractorStage;
+ private final PipeTaskStage sourceStage;
private final PipeTaskStage processorStage;
- private final PipeTaskStage connectorStage;
+ private final PipeTaskStage sinkStage;
private volatile boolean isCompleted = false;
public PipeDataNodeTask(
final String pipeName,
final int regionId,
- final PipeTaskStage extractorStage,
+ final PipeTaskStage sourceStage,
final PipeTaskStage processorStage,
- final PipeTaskStage connectorStage) {
+ final PipeTaskStage sinkStage) {
this.pipeName = pipeName;
this.regionId = regionId;
- this.extractorStage = extractorStage;
+ this.sourceStage = sourceStage;
this.processorStage = processorStage;
- this.connectorStage = connectorStage;
+ this.sinkStage = sinkStage;
}
@Override
public void create() {
final long startTime = System.currentTimeMillis();
- extractorStage.create();
+ sourceStage.create();
processorStage.create();
- connectorStage.create();
+ sinkStage.create();
LOGGER.info(
"Create pipe DN task {} successfully within {} ms",
this,
@@ -67,9 +67,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void drop() {
final long startTime = System.currentTimeMillis();
- extractorStage.drop();
+ sourceStage.drop();
processorStage.drop();
- connectorStage.drop();
+ sinkStage.drop();
LOGGER.info(
"Drop pipe DN task {} successfully within {} ms",
this,
@@ -79,9 +79,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void start() {
final long startTime = System.currentTimeMillis();
- extractorStage.start();
+ sourceStage.start();
processorStage.start();
- connectorStage.start();
+ sinkStage.start();
LOGGER.info(
"Start pipe DN task {} successfully within {} ms",
this,
@@ -91,9 +91,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void stop() {
final long startTime = System.currentTimeMillis();
- extractorStage.stop();
+ sourceStage.stop();
processorStage.stop();
- connectorStage.stop();
+ sinkStage.stop();
LOGGER.info(
"Stop pipe DN task {} successfully within {} ms",
this,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index b55704a3da4..46a10135d88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -63,18 +63,18 @@ public class PipeDataNodeBuilder {
final PipeTaskMeta pipeTaskMeta =
consensusGroupIdToPipeTaskMeta.getValue();
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
- final PipeParameters extractorParameters =
pipeStaticMeta.getSourceParameters();
+ final PipeParameters sourceParameters =
pipeStaticMeta.getSourceParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
dataRegionIds.contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
- extractorParameters, dataRegionId);
+ sourceParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
&& SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
- consensusGroupId, extractorParameters);
+ consensusGroupId, sourceParameters);
- // Advance the extractor parameters parsing logic to avoid creating
un-relevant pipeTasks
+ // Advance the source parameters parsing logic to avoid creating
un-relevant pipeTasks
if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
consensusGroupIdToPipeTaskMap.put(
consensusGroupId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
index 9a88ad74d7c..4abc8daed30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
@@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends
PipeSubtaskExecutor {
public PipeSinkSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
- ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(),
+ ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(),
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" +
id.getAndIncrement(),
true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 2e887f74bf0..2373495c8eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -56,8 +56,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
* @param creationTime pipe creation time
* @param pipeProcessorParameters used to create {@link PipeProcessor}
* @param regionId {@link DataRegion} id
- * @param pipeExtractorInputEventSupplier used to input {@link Event}s from
{@link PipeExtractor}
- * @param pipeConnectorOutputPendingQueue used to output {@link Event}s to
{@link PipeConnector}
+ * @param pipeSourceInputEventSupplier used to input {@link Event}s from
{@link PipeExtractor}
+ * @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link
PipeConnector}
* @throws PipeException if failed to {@link
PipeProcessor#validate(PipeParameterValidator)} or
* {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)}}
*/
@@ -66,8 +66,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final long creationTime,
final PipeParameters pipeProcessorParameters,
final int regionId,
- final EventSupplier pipeExtractorInputEventSupplier,
- final UnboundedBlockingPendingQueue<Event>
pipeConnectorOutputPendingQueue,
+ final EventSupplier pipeSourceInputEventSupplier,
+ final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
final boolean forceTabletFormat,
@@ -103,9 +103,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final boolean isUsedForConsensusPipe =
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
- final PipeEventCollector pipeConnectorOutputEventCollector =
+ final PipeEventCollector pipeSinkOutputEventCollector =
new PipeEventCollector(
- pipeConnectorOutputPendingQueue,
+ pipeSinkOutputPendingQueue,
creationTime,
regionId,
forceTabletFormat,
@@ -117,9 +117,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
pipeName,
creationTime,
regionId,
- pipeExtractorInputEventSupplier,
+ pipeSourceInputEventSupplier,
pipeProcessor,
- pipeConnectorOutputEventCollector);
+ pipeSinkOutputEventCollector);
this.executor = executor;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..88eac560cde 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
- protected String connectorSubtaskId;
+ protected String sinkSubtaskId;
public PipeTaskSinkStage(
String pipeName,
@@ -56,7 +56,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
}
protected void registerSubtask() {
- this.connectorSubtaskId =
+ this.sinkSubtaskId =
PipeSinkSubtaskManager.instance()
.register(
executor,
@@ -71,21 +71,20 @@ public class PipeTaskSinkStage extends PipeTaskStage {
@Override
public void startSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance().start(connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().start(sinkSubtaskId);
}
@Override
public void stopSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance().stop(connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().stop(sinkSubtaskId);
}
@Override
public void dropSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance()
- .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime,
regionId, sinkSubtaskId);
}
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
- return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
+ return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 57a804df0d3..5f774ceb379 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -45,33 +45,32 @@ public class PipeTaskSourceStage extends PipeTaskStage {
public PipeTaskSourceStage(
String pipeName,
long creationTime,
- PipeParameters extractorParameters,
+ PipeParameters sourceParameters,
int regionId,
PipeTaskMeta pipeTaskMeta) {
pipeExtractor =
StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId)
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
- :
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
- // Validate and customize should be called before createSubtask. this
allows extractor exposing
+ // Validate and customize should be called before createSubtask. this
allows source exposing
// exceptions in advance.
try {
- // 1. Validate extractor parameters
- pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
+ // 1. Validate source parameters
+ pipeExtractor.validate(new PipeParameterValidator(sourceParameters));
- // 2. Customize extractor
+ // 2. Customize source
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime,
regionId, pipeTaskMeta));
- pipeExtractor.customize(extractorParameters, runtimeConfiguration);
+ pipeExtractor.customize(sourceParameters, runtimeConfiguration);
} catch (Exception e) {
try {
pipeExtractor.close();
} catch (Exception closeException) {
LOGGER.warn(
- "Failed to close extractor after failed to initialize extractor. "
- + "Ignore this exception.",
+ "Failed to close source after failed to initialize source. " +
"Ignore this exception.",
closeException);
}
throw new PipeException(e.getMessage(), e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index e4d39b49523..bc056857c17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -52,13 +52,13 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBSchemaRegionAirGapConnector can't transfer
TabletInsertionEvent.");
+ "IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent.");
}
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBSchemaRegionAirGapConnector can't transfer
TsFileInsertionEvent.");
+ "IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent.");
}
@Override
@@ -73,8 +73,7 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
- "IoTDBSchemaRegionAirGapConnector does not support transferring
generic event: {}.",
- event);
+ "IoTDBSchemaRegionAirGapSink does not support transferring generic
event: {}.", event);
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index d95513a2228..52c52b1038e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -69,7 +69,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
for (final Map.Entry<Pair<String, Long>, Long> entry :
pipeName2BytesAccumulated.entrySet()) {
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
entry.getKey().getLeft(),
entry.getKey().getRight(),
client.getEndPoint(),
@@ -92,13 +92,11 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
- .handle(status, response.getStatus().getMessage(),
events.toString());
+ sink.statusHandler().handle(status, response.getStatus().getMessage(),
events.toString());
}
for (final Pair<String, TEndPoint> redirectPair :
LeaderCacheUtils.parseRecommendedRedirections(status)) {
- connector.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
+ sink.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
}
events.forEach(
@@ -123,7 +121,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
- connector.addFailureEventsToRetryQueue(events, exception);
+ sink.addFailureEventsToRetryQueue(events, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 70ba7f4cfc5..912a1e724f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,7 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
@Override
protected void updateLeaderCache(final TSStatus status) {
- connector.updateLeaderCache(
+ sink.updateLeaderCache(
((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index ca92af92b19..ac252818c14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -52,7 +52,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
}
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
event.getPipeName(), event.getCreationTime(), client.getEndPoint(),
req.getBody().length);
tryTransfer(client, req);
@@ -71,8 +71,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
+ sink.statusHandler()
.handle(response.getStatus(), response.getStatus().getMessage(),
event.toString());
}
event.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
true);
@@ -98,7 +97,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
event.getCommitterKey(),
event.getCommitId());
} finally {
- connector.addFailureEventToRetryQueue(event, exception);
+ sink.addFailureEventToRetryQueue(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index ff1daa05c28..b64e446827a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -32,8 +32,8 @@ public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInserti
public PipeTransferTabletRawEventHandler(
final PipeRawTabletInsertionEvent event,
final TPipeTransferReq req,
- final IoTDBDataRegionAsyncSink connector) {
- super(event, req, connector);
+ final IoTDBDataRegionAsyncSink sink) {
+ super(event, req, sink);
}
@Override
@@ -45,7 +45,7 @@ public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInserti
@Override
protected void updateLeaderCache(final TSStatus status) {
- connector.updateLeaderCache(
+ sink.updateLeaderCache(
((PipeRawTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 21f7c144bed..a8b4a3b7a79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -36,18 +36,18 @@ public abstract class PipeTransferTrackableHandler
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
- protected final IoTDBDataRegionAsyncSink connector;
+ protected final IoTDBDataRegionAsyncSink sink;
protected volatile AsyncPipeDataTransferServiceClient client;
- public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink
connector) {
- this.connector = connector;
+ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {
+ this.sink = sink;
}
@Override
public void onComplete(final TPipeTransferResp response) {
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
return;
}
@@ -56,7 +56,7 @@ public abstract class PipeTransferTrackableHandler
// completed
// NOTE: We should not clear the reference count of events, as this
would cause the
//
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
test to fail.
- connector.eliminateHandler(this, false);
+ sink.eliminateHandler(this, false);
}
}
@@ -67,14 +67,14 @@ public abstract class PipeTransferTrackableHandler
client.setPrintLogWhenEncounterException(false);
}
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
return;
}
onErrorInternal(exception);
- connector.eliminateHandler(this, false);
+ sink.eliminateHandler(this, false);
}
/**
@@ -93,10 +93,10 @@ public abstract class PipeTransferTrackableHandler
this.client = client;
}
// track handler before checking if connector is closed
- connector.trackHandler(this);
- if (connector.isClosed()) {
+ sink.trackHandler(this);
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
client.returnSelf(
(e) -> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3eaaa94c416..35a28d1413a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -162,7 +162,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
if (client == null) {
LOGGER.warn(
"Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
- connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
}
@@ -171,7 +171,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
client.setTimeoutDynamically(clientManager.getConnectionTimeout());
PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
- if (connector.isEnableSendTsFileLimit()) {
+ if (sink.isEnableSendTsFileLimit()) {
TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
}
final int readLength = reader.read(readBuffer);
@@ -200,11 +200,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
dataBaseName)
: PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
tsFile.getName(), tsFile.length(), dataBaseName);
- final TPipeTransferReq req =
connector.compressIfNeeded(uncompressedReq);
+ final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
pipePair.getLeft(),
pipePair.getRight(),
client.getEndPoint(),
@@ -227,11 +227,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
currentFile.getName(), position, payload)
: PipeTransferTsFilePieceReq.toTPipeTransferReq(
currentFile.getName(), position, payload);
- final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
+ final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
pipePair.getLeft(),
pipePair.getRight(),
client.getEndPoint(),
@@ -249,7 +249,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
try {
super.onComplete(response);
} finally {
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
returnClientIfNecessary();
}
}
@@ -263,8 +263,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
+ sink.statusHandler()
.handle(
status,
String.format(
@@ -338,9 +337,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
- .handle(status, response.getStatus().getMessage(),
tsFile.getName());
+ sink.statusHandler().handle(status,
response.getStatus().getMessage(), tsFile.getName());
}
}
@@ -412,7 +409,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- connector.addFailureEventsToRetryQueue(events, exception);
+ sink.addFailureEventsToRetryQueue(events, exception);
}
}
}
@@ -423,7 +420,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
return;
}
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
closeClient();
}
@@ -447,7 +444,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
if (client == null) {
LOGGER.warn(
"Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
- connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 52ac137ae4e..2019eba8560 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
import java.util.function.Consumer;
-import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
+import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_SOURCE_DISRUPTOR;
public class DisruptorQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(DisruptorQueue.class);
private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
- new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
+ new IoTDBDaemonThreadFactory(PIPE_SOURCE_DISRUPTOR.getName());
private final PipeMemoryBlock allocatedMemoryBlock;
private final Disruptor<EventContainer> disruptor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index fded546d87d..157fb0078e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -45,33 +45,33 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* <p>All events extracted by this listener will be first published to
different
* PipeEventDataRegionAssigners (identified by data region id), and then
PipeEventDataRegionAssigner
- * will filter events and assign them to different
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different
PipeRealtimeEventDataRegionSources.
*/
public class PipeInsertionDataNodeListener {
private final ConcurrentMap<Integer, PipeDataRegionAssigner>
dataRegionId2Assigner =
new ConcurrentHashMap<>();
- private final AtomicInteger listenToTsFileExtractorCount = new
AtomicInteger(0);
- private final AtomicInteger listenToInsertNodeExtractorCount = new
AtomicInteger(0);
+ private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
+ private final AtomicInteger listenToInsertNodeSourceCount = new
AtomicInteger(0);
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
- final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
- .startAssignTo(extractor);
+ .startAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.incrementAndGet();
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount.incrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.incrementAndGet();
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount.incrementAndGet();
}
}
public synchronized void stopListenAndAssign(
- final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
PipeDataRegionAssigner assignerToClose = null;
synchronized (this) {
@@ -80,13 +80,13 @@ public class PipeInsertionDataNodeListener {
return;
}
- assigner.stopAssignTo(extractor);
+ assigner.stopAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.decrementAndGet();
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount.decrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.decrementAndGet();
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount.decrementAndGet();
}
if (assigner.notMoreSourceNeededToBeAssigned()) {
@@ -110,8 +110,8 @@ public class PipeInsertionDataNodeListener {
final String databaseName,
final TsFileResource tsFileResource,
final boolean isLoaded) {
- // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
- // because extractors may use tsfile events when some exceptions occur in
the
+ // We don't judge whether listenToTsFileSourceCount.get() == 0 here on
purpose
+ // because spirces may use tsfile events when some exceptions occur in the
// insert nodes listening process.
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
@@ -131,7 +131,7 @@ public class PipeInsertionDataNodeListener {
final String databaseName,
final InsertNode insertNode,
final TsFileResource tsFileResource) {
- if (listenToInsertNodeExtractorCount.get() == 0) {
+ if (listenToInsertNodeSourceCount.get() == 0) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 490f16ca5f0..995e8a95e3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1193,36 +1193,72 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
if (lastIdx < startOffset) {
return null;
}
+ return composeTimeValuePair(measurementIndex, lastIdx);
+ }
+
+ public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+ }
+
+ protected TimeValuePair composeLastTimeValuePair(
+ final int measurementIndex,
+ final TSStatus[] results,
+ final int startOffset,
+ final int endOffset) {
+ if (results == null) {
+ return composeLastTimeValuePair(measurementIndex, startOffset,
endOffset);
+ }
+ if (measurementIndex >= columns.length ||
Objects.isNull(dataTypes[measurementIndex])) {
+ return null;
+ }
+
+ final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+ int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+ while (lastIdx >= startOffset) {
+ if (results[lastIdx] != null
+ && results[lastIdx].getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ lastIdx--;
+ continue;
+ }
+ if (bitMap != null && bitMap.isMarked(lastIdx)) {
+ lastIdx--;
+ continue;
+ }
+ break;
+ }
+ return lastIdx < startOffset ? null :
composeTimeValuePair(measurementIndex, lastIdx);
+ }
+ private TimeValuePair composeTimeValuePair(final int measurementIndex, final
int rowIndex) {
TsPrimitiveType value;
switch (dataTypes[measurementIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+ value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
break;
case INT64:
case TIMESTAMP:
long[] longValues = (long[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+ value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
break;
case FLOAT:
float[] floatValues = (float[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+ value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+ value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+ value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
break;
case TEXT:
case BLOB:
case STRING:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+ value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
break;
case OBJECT:
return null;
@@ -1230,11 +1266,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
}
- return new TimeValuePair(times[lastIdx], value);
- }
-
- public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
- return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+ return new TimeValuePair(times[rowIndex], value);
}
public IDeviceID getDeviceID(int rowIdx) {
@@ -1313,10 +1345,14 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
public void updateLastCache(final String databaseName) {
+ updateLastCache(databaseName, null);
+ }
+
+ public void updateLastCache(final String databaseName, final TSStatus[]
results) {
final String[] rawMeasurements = getRawMeasurements();
final TimeValuePair[] timeValuePairs = new
TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
- timeValuePairs[i] = composeLastTimeValuePair(i);
+ timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
}
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCacheIfExists(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index b9f7edbe87c..8d24ad77364 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -371,6 +371,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
@Override
public void updateLastCache(final String databaseName) {
+ updateLastCache(databaseName, null);
+ }
+
+ public void updateLastCache(final String databaseName, final TSStatus[]
results) {
final String[] rawMeasurements = getRawMeasurements();
final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
splitByDevice(0, rowCount);
@@ -381,7 +385,7 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
final TimeValuePair[] timeValuePairs = new
TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
- timeValuePairs[i] = composeLastTimeValuePair(i, startOffset,
endOffset);
+ timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset,
endOffset);
}
TableDeviceSchemaCache.getInstance()
.updateLastCacheIfExists(databaseName, deviceID, rawMeasurements,
timeValuePairs);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 933efbae84c..e5d75b4a210 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1455,7 +1455,7 @@ public class DataRegion implements IDataRegionForQuery {
&& !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
// disable updating last cache on follower
long startTime = System.nanoTime();
- tryToUpdateInsertTabletLastCache(insertTabletNode);
+ tryToUpdateInsertTabletLastCache(insertTabletNode, results);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
}
return noFailure;
@@ -1506,18 +1506,12 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
- for (int[] rangePair : rangeList) {
- int start = rangePair[0];
- int end = rangePair[1];
- for (int i = start; i < end; i++) {
- results[i] =
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR,
- "can not create TsFileProcessor, timePartitionId: " +
timePartitionId);
- }
- }
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+ } catch (WriteProcessException e) {
+ markInsertTabletRangesFailed(
+ rangeList, results, RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
return false;
}
@@ -1546,6 +1540,15 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ private void markInsertTabletRangesFailed(
+ final List<int[]> rangeList, final TSStatus[] results, final TSStatus
failureStatus) {
+ for (int[] rangePair : rangeList) {
+ for (int i = rangePair[0]; i < rangePair[1]; i++) {
+ results[i] = failureStatus;
+ }
+ }
+ }
+
private TableSchema getTableSchemaFromCache(
final String database, final String tableName, final Pair<Long, Long>
currentVersion) {
final TableSchemaCacheKey key = new TableSchemaCacheKey(database,
tableName);
@@ -1679,6 +1682,11 @@ public class DataRegion implements IDataRegionForQuery {
node.updateLastCache(getDatabaseName());
}
+ private void tryToUpdateInsertTabletLastCache(
+ final InsertTabletNode node, final TSStatus[] results) {
+ node.updateLastCache(getDatabaseName(), results);
+ }
+
private TsFileProcessor insertToTsFileProcessor(
InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
throws WriteProcessException {
@@ -1686,19 +1694,16 @@ public class DataRegion implements IDataRegionForQuery {
return null;
}
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
- return null;
- }
long[] infoForMetrics = new long[5];
// infoForMetrics[0]: CreateMemtableBlockTimeCost
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
- tsFileProcessor.insert(insertRowNode, infoForMetrics);
- updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
// register TableSchema (and maybe more) for table insertion
registerToTsFile(insertRowNode, tsFileProcessor);
+ tsFileProcessor.insert(insertRowNode, infoForMetrics);
+ updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
return tsFileProcessor;
}
@@ -1717,9 +1722,11 @@ public class DataRegion implements IDataRegionForQuery {
if (insertRowNode.allMeasurementFailed()) {
continue;
}
- TsFileProcessor tsFileProcessor =
- getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i],
areSequence[i]);
+ } catch (WriteProcessException e) {
+ insertRowsNode.getResults().put(i,
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -1727,78 +1734,156 @@ public class DataRegion implements IDataRegionForQuery
{
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = insertRowsNode.emptyClone();
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
}
- if (v.isAligned() != insertRowNode.isAligned()) {
- v.setMixingAlignment(true);
- }
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor =
- insertRowsWithTypeConsistencyCheck(tsFileProcessor,
subInsertRowsNode, infoForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (entry.getKey().shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
}
}
return executedInsertRowNodeList;
}
- private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+ private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode,
long[] infoForMetrics)
throws WriteProcessException {
try {
// register TableSchema (and maybe more) for table insertion
registerToTsFile(subInsertRowsNode, tsFileProcessor);
tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ return Collections.singletonList(tsFileProcessor);
} catch (DataTypeInconsistentException e) {
InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
long timePartitionId =
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
// flush both MemTables so that the new type can be inserted into a new
MemTable
- TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
- if (workSequenceProcessor != null) {
- fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
- }
- TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
- if (workUnsequenceProcessor != null) {
- fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
- }
+ flushWorkingProcessorsForTimePartition(timePartitionId);
+ return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId,
infoForMetrics);
+ }
+ }
- boolean isSequence =
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsNode sourceInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
sourceInsertRowsNode.emptyClone();
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowNode firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
+ new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private void initializeGroupedInsertRowsNode(
+ final InsertRowsNode groupedInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+ groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+ if (firstInsertRowNode.isGeneratedByPipe()) {
+ groupedInsertRowsNode.markAsGeneratedByPipe();
+ }
+ if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+ }
+ }
+
+ private void appendInsertRowNode(
+ final InsertRowsNode groupedInsertRowsNode,
+ final InsertRowNode insertRowNode,
+ final int insertRowNodeIndex) {
+ if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+ groupedInsertRowsNode.setMixingAlignment(true);
+ }
+ groupedInsertRowsNode.addOneInsertRowNode(insertRowNode,
insertRowNodeIndex);
+
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+ }
+
+ private void flushWorkingProcessorsForTimePartition(final long
timePartitionId) {
+ TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
+ if (workSequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
+ }
+ TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
+ if (workUnsequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
+ }
+ }
+
+ private List<TsFileProcessor> retryInsertRowsAfterFlush(
+ final InsertRowsNode subInsertRowsNode,
+ final long timePartitionId,
+ final long[] infoForMetrics)
+ throws WriteProcessException {
+ final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new
HashMap<>();
+ for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+ final InsertRowNode insertRowNode =
subInsertRowsNode.getInsertRowNodeList().get(i);
+ final boolean isSequence =
config.isEnableSeparateData()
- && firstRow.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
firstRow.getDeviceID());
- tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
- registerToTsFile(subInsertRowsNode, tsFileProcessor);
- tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ && insertRowNode.getTime()
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ final TsFileProcessor retriedProcessor =
+ getOrCreateTsFileProcessor(timePartitionId, isSequence);
+ final int insertRowNodeIndex =
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+ retriedProcessorMap.compute(
+ retriedProcessor,
+ (k, v) -> {
+ if (v == null) {
+ v = createGroupedInsertRowsNode(subInsertRowsNode,
insertRowNode);
+ }
+ appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+ return v;
+ });
+ }
+
+ final List<TsFileProcessor> insertedProcessors = new
ArrayList<>(retriedProcessorMap.size());
+ for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry :
retriedProcessorMap.entrySet()) {
+ final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+ registerToTsFile(retriedEntry.getValue(), retriedProcessor);
+ retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+ insertedProcessors.add(retriedProcessor);
+ }
+ return insertedProcessors;
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+ }
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
}
- return tsFileProcessor;
}
private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -1859,7 +1944,8 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean
sequence) {
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
TsFileProcessor tsFileProcessor = null;
int retryCnt = 0;
do {
@@ -1885,7 +1971,7 @@ public class DataRegion implements IDataRegionForQuery {
"disk space is insufficient when creating TsFile processor, change
system mode to read-only",
e);
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
} catch (IOException e) {
if (retryCnt < 3) {
logger.warn("meet IOException when creating TsFileProcessor, retry
it again", e);
@@ -1894,11 +1980,15 @@ public class DataRegion implements IDataRegionForQuery {
logger.error(
"meet IOException when creating TsFileProcessor, change system
mode to error", e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
- break;
+ throw new WriteProcessException(
+ String.format(
+ "Failed to create TsFileProcessor for database %s,
timePartitionId %s",
+ databaseName, timeRangeId),
+ e);
}
} catch (ExceedQuotaException e) {
logger.error(e.getMessage());
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
}
} while (tsFileProcessor == null);
return tsFileProcessor;
@@ -4490,8 +4580,13 @@ public class DataRegion implements IDataRegionForQuery {
config.isEnableSeparateData()
&& insertRowNode.getTime()
> lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
+ } catch (WriteProcessException e) {
+ insertRowsOfOneDeviceNode
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -4499,18 +4594,9 @@ public class DataRegion implements IDataRegionForQuery {
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = new
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode,
insertRowNode);
}
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
@@ -4522,24 +4608,19 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor =
- insertRowsWithTypeConsistencyCheck(
- tsFileProcessor, subInsertRowsNode, infoForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsOfOneDeviceNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (tsFileProcessor.shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsOfOneDeviceNode,
subInsertRowsNode, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..45c6d6f86cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -41,7 +41,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
@Override
protected void registerSubtask() {
- this.connectorSubtaskId =
+ this.sinkSubtaskId =
SubscriptionSinkSubtaskManager.instance()
.register(
executor.get(),
@@ -56,22 +56,21 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
@Override
public void startSubtask() throws PipeException {
- SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
+ SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
}
@Override
public void stopSubtask() throws PipeException {
- SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
+ SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
}
@Override
public void dropSubtask() throws PipeException {
SubscriptionSinkSubtaskManager.instance()
- .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+ .deregister(pipeName, creationTime, regionId, sinkSubtaskId);
}
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
- return SubscriptionSinkSubtaskManager.instance()
- .getPipeConnectorPendingQueue(connectorSubtaskId);
+ return
SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 68d76764920..19ac712882b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -32,8 +33,10 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -46,6 +49,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -68,6 +72,8 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -84,6 +90,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +107,9 @@ import java.util.concurrent.Future;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
public class DataRegionTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -149,6 +159,7 @@ public class DataRegionTest {
dataRegion.syncDeleteDataFiles();
StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
}
+ TreeDeviceSchemaCacheManager.getInstance().cleanUp();
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanEnv();
@@ -1064,6 +1075,196 @@ public class DataRegionTest {
dataRegion1.syncDeleteDataFiles();
}
+ @Test
+ public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+ throws IllegalPathException, DataRegionException,
TsFileProcessorException {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_row");
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ throw new WriteProcessRejectException("mock creation failure");
+ });
+
+ final TSRecord record = new TSRecord("root.fail_row", 1);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1)));
+ final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+ try {
+ dataRegion1.insert(insertRowNode);
+ Assert.fail("Expected WriteProcessRejectException");
+ } catch (WriteProcessRejectException e) {
+ Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+ } catch (WriteProcessException e) {
+ Assert.fail("Expected WriteProcessRejectException but got " +
e.getClass().getSimpleName());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws
Exception {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_rows");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+ .when(processor)
+ .insertRows(any(InsertRowsNode.class), any(long[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ for (long time : new long[] {1, 2}) {
+ final TSRecord record = new TSRecord("root.fail_rows", time);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(time)));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ Assert.assertEquals(2, insertRowsNode.getResults().size());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(0).getCode());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(1).getCode());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_rows");
+ final TsFileProcessor successProcessor =
Mockito.mock(TsFileProcessor.class);
+ Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+ Mockito.when(successProcessor.isSequence()).thenReturn(true);
+ final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+ final long failingPartitionId =
TimePartitionUtils.getTimePartitionId(failingTime);
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ if (timePartitionId == failingPartitionId) {
+ throw new WriteProcessException("mock row failure");
+ }
+ return successProcessor;
+ });
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_rows",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+ TreeDeviceSchemaCacheManager.getInstance()
+ .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ final long[] times = new long[] {1, failingTime};
+ final int[] values = new int[] {10, 20};
+ for (int i = 0; i < times.length; i++) {
+ final long time = times[i];
+ final TSRecord record = new TSRecord("root.cache_rows", time);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(values[i])));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
+ @Test
+ public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_tablet");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doAnswer(
+ invocation -> {
+ TSStatus[] results = invocation.getArgument(2);
+ results[0] = RpcUtils.SUCCESS_STATUS;
+ results[1] =
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
"mock row failure");
+ throw new WriteProcessException("mock tablet failure");
+ })
+ .when(processor)
+ .insertTablet(
+ any(InsertTabletNode.class),
+ anyList(),
+ any(TSStatus[].class),
+ anyBoolean(),
+ any(long[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_tablet",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+ TreeDeviceSchemaCacheManager.getInstance()
+ .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+ final String[] measurements = new String[] {measurementId};
+ final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+ final MeasurementSchema[] measurementSchemas =
+ new MeasurementSchema[] {
+ new MeasurementSchema(measurementId, TSDataType.INT32,
TSEncoding.PLAIN)
+ };
+ final long[] times = new long[] {1, 2};
+ final Object[] columns = new Object[] {new int[] {10, 20}};
+ final InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.cache_tablet"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ times.length);
+
+ try {
+ dataRegion1.insertTablet(insertTabletNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
@Test
public void testSmallReportProportionInsertRow()
throws WriteProcessException,
@@ -1667,6 +1868,32 @@ public class DataRegionTest {
}
}
+ private interface TsFileProcessorSupplier {
+ TsFileProcessor get(long timePartitionId, boolean sequence) throws
WriteProcessException;
+ }
+
+ private static class HookedDataRegion extends DummyDataRegion {
+ private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+ private HookedDataRegion(String systemInfoDir, String storageGroupName)
+ throws DataRegionException {
+ super(systemInfoDir, storageGroupName);
+ }
+
+ private void setTsFileProcessorSupplier(TsFileProcessorSupplier
tsFileProcessorSupplier) {
+ this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+ }
+
+ @Override
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
+ if (tsFileProcessorSupplier != null) {
+ return tsFileProcessorSupplier.get(timeRangeId, sequence);
+ }
+ return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+ }
+ }
+
// -- test for deleting data directly
// -- delete data and file only when:
// 1. tsfile is closed
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index a5a6dd3808e..ea20f9c76cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -285,7 +285,7 @@ public class ClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.build(),
- ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
.build()
@@ -311,7 +311,7 @@ public class ClientPoolFactory {
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
.build(),
- ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
.build()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 81f2aa7156c..20fa9d78d59 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -136,9 +136,9 @@ public enum ThreadName {
GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
GROUP_MANAGEMENT("groupManagement"),
// -------------------------- Compute --------------------------
- PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
+ PIPE_SOURCE_DISRUPTOR("Pipe-Source-Disruptor"),
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
- PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
+ PIPE_SINK_EXECUTOR_POOL("Pipe-Sink-Executor-Pool"),
IOT_CONSENSUS_V2_EXECUTOR_POOL("Pipe-Consensus-Executor-Pool"),
PIPE_CONFIGNODE_EXECUTOR_POOL("Pipe-ConfigNode-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
@@ -149,7 +149,7 @@ public enum ThreadName {
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
"Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
- PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+ PIPE_ASYNC_SINK_CLIENT_POOL("Pipe-Async-Sink-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
@@ -302,9 +302,9 @@ public enum ThreadName {
private static final Set<ThreadName> computeThreadNames =
new HashSet<>(
Arrays.asList(
- PIPE_EXTRACTOR_DISRUPTOR,
+ PIPE_SOURCE_DISRUPTOR,
PIPE_PROCESSOR_EXECUTOR_POOL,
- PIPE_CONNECTOR_EXECUTOR_POOL,
+ PIPE_SINK_EXECUTOR_POOL,
IOT_CONSENSUS_V2_EXECUTOR_POOL,
PIPE_CONFIGNODE_EXECUTOR_POOL,
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
@@ -313,7 +313,7 @@ public enum ThreadName {
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
- PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+ PIPE_ASYNC_SINK_CLIENT_POOL,
PIPE_RECEIVER_AIR_GAP_AGENT,
PIPE_AIR_GAP_RECEIVER,
PIPE_PARALLEL_EXECUTION_POOL,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index df100b3874d..cbed4db547e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -281,11 +281,11 @@ public class CommonConfig {
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
- private int pipeAsyncConnectorSelectorNumber =
+ private int pipeAsyncSinkSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
- private int pipeAsyncConnectorMaxClientNumber =
+ private int pipeAsyncSinkMaxClientNumber =
Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
- private int pipeAsyncConnectorMaxTsFileClientNumber =
+ private int pipeAsyncSinkMaxTsFileClientNumber =
Math.max(16, Runtime.getRuntime().availableProcessors());
private boolean printLogWhenEncounterException = false;
@@ -293,8 +293,7 @@ public class CommonConfig {
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
- private int pipeConnectorRequestSliceThresholdBytes =
- (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+ private int pipeSinkRequestSliceThresholdBytes = (int)
(RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
@@ -1078,7 +1077,7 @@ public class CommonConfig {
}
public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
- final int fPipeConnectorHandshakeTimeoutMs =
this.pipeSinkHandshakeTimeoutMs;
+ final int fPipeSinkHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs;
try {
this.pipeSinkHandshakeTimeoutMs =
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
} catch (ArithmeticException e) {
@@ -1086,7 +1085,7 @@ public class CommonConfig {
logger.warn(
"Given pipe connector handshake timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs)
{
+ if (fPipeSinkHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) {
logger.info(
"pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeSinkHandshakeTimeoutMs);
}
@@ -1118,16 +1117,16 @@ public class CommonConfig {
}
public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
- final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
+ final int fPipeSinkTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
try {
this.pipeSinkTransferTimeoutMs =
Math.toIntExact(pipeSinkTransferTimeoutMs);
} catch (ArithmeticException e) {
this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
logger.warn(
- "Given pipe connector transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
+ "Given pipe sink transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
- logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
+ if (fPipeSinkTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+ logger.info("pipeSinkTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
}
}
}
@@ -1141,7 +1140,7 @@ public class CommonConfig {
return;
}
this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
- logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
+ logger.info("pipeSinkReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
}
public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
@@ -1241,60 +1240,58 @@ public class CommonConfig {
}
public int getPipeAsyncSinkSelectorNumber() {
- return pipeAsyncConnectorSelectorNumber;
+ return pipeAsyncSinkSelectorNumber;
}
- public void setPipeAsyncConnectorSelectorNumber(int
pipeAsyncConnectorSelectorNumber) {
- if (pipeAsyncConnectorSelectorNumber <= 0) {
+ public void setPipeAsyncSinkSelectorNumber(int pipeAsyncSinkSelectorNumber) {
+ if (pipeAsyncSinkSelectorNumber <= 0) {
logger.info(
- "pipeAsyncConnectorSelectorNumber should be greater than 0,
configuring it not to change.");
+ "pipeAsyncSinkSelectorNumber should be greater than 0, configuring
it not to change.");
return;
}
- pipeAsyncConnectorSelectorNumber = Math.max(4,
pipeAsyncConnectorSelectorNumber);
- if (this.pipeAsyncConnectorSelectorNumber ==
pipeAsyncConnectorSelectorNumber) {
+ pipeAsyncSinkSelectorNumber = Math.max(4, pipeAsyncSinkSelectorNumber);
+ if (this.pipeAsyncSinkSelectorNumber == pipeAsyncSinkSelectorNumber) {
return;
}
- this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
- logger.info("pipeAsyncConnectorSelectorNumber is set to {}.",
pipeAsyncConnectorSelectorNumber);
+ this.pipeAsyncSinkSelectorNumber = pipeAsyncSinkSelectorNumber;
+ logger.info("pipeAsyncSinkSelectorNumber is set to {}.",
pipeAsyncSinkSelectorNumber);
}
public int getPipeAsyncSinkMaxClientNumber() {
- return pipeAsyncConnectorMaxClientNumber;
+ return pipeAsyncSinkMaxClientNumber;
}
- public void setPipeAsyncConnectorMaxClientNumber(int
pipeAsyncConnectorMaxClientNumber) {
- if (pipeAsyncConnectorMaxClientNumber <= 0) {
+ public void setPipeAsyncSinkMaxClientNumber(int
pipeAsyncSinkMaxClientNumber) {
+ if (pipeAsyncSinkMaxClientNumber <= 0) {
logger.info(
- " pipeAsyncConnectorMaxClientNumber should be greater than 0,
configuring it not to change.");
+ " pipeAsyncSinkMaxClientNumber should be greater than 0, configuring
it not to change.");
return;
}
- pipeAsyncConnectorMaxClientNumber = Math.max(32,
pipeAsyncConnectorMaxClientNumber);
- if (this.pipeAsyncConnectorMaxClientNumber ==
pipeAsyncConnectorMaxClientNumber) {
+ pipeAsyncSinkMaxClientNumber = Math.max(32, pipeAsyncSinkMaxClientNumber);
+ if (this.pipeAsyncSinkMaxClientNumber == pipeAsyncSinkMaxClientNumber) {
return;
}
- this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
- logger.info(
- "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
+ this.pipeAsyncSinkMaxClientNumber = pipeAsyncSinkMaxClientNumber;
+ logger.info("pipeAsyncSinkMaxClientNumber is set to {}.",
pipeAsyncSinkMaxClientNumber);
}
public int getPipeAsyncSinkMaxTsFileClientNumber() {
- return pipeAsyncConnectorMaxTsFileClientNumber;
+ return pipeAsyncSinkMaxTsFileClientNumber;
}
- public void setPipeAsyncConnectorMaxTsFileClientNumber(
- int pipeAsyncConnectorMaxTsFileClientNumber) {
- if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) {
+ public void setPipeAsyncSinkMaxTsFileClientNumber(int
pipeAsyncSinkMaxTsFileClientNumber) {
+ if (pipeAsyncSinkMaxTsFileClientNumber <= 0) {
logger.info(
- "pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0,
configuring it not to change.");
+ "pipeAsyncSinkMaxTsFileClientNumber should be greater than 0,
configuring it not to change.");
return;
}
- pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16,
pipeAsyncConnectorMaxTsFileClientNumber);
- if (this.pipeAsyncConnectorMaxTsFileClientNumber ==
pipeAsyncConnectorMaxTsFileClientNumber) {
+ pipeAsyncSinkMaxTsFileClientNumber = Math.max(16,
pipeAsyncSinkMaxTsFileClientNumber);
+ if (this.pipeAsyncSinkMaxTsFileClientNumber ==
pipeAsyncSinkMaxTsFileClientNumber) {
return;
}
- this.pipeAsyncConnectorMaxTsFileClientNumber =
pipeAsyncConnectorMaxTsFileClientNumber;
+ this.pipeAsyncSinkMaxTsFileClientNumber =
pipeAsyncSinkMaxTsFileClientNumber;
logger.info(
- "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxTsFileClientNumber);
+ "pipeAsyncSinkMaxTsFileClientNumber is set to {}.",
pipeAsyncSinkMaxTsFileClientNumber);
}
public boolean isPrintLogWhenEncounterException() {
@@ -1398,12 +1395,12 @@ public class CommonConfig {
return pipeSinkRetryIntervalMs;
}
- public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
- if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+ public void setPipeSinkRetryIntervalMs(long pipeSinkRetryIntervalMs) {
+ if (this.pipeSinkRetryIntervalMs == pipeSinkRetryIntervalMs) {
return;
}
- this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
- logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ this.pipeSinkRetryIntervalMs = pipeSinkRetryIntervalMs;
+ logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeSinkRetryIntervalMs);
}
public boolean isPipeSinkRetryLocallyForConnectionError() {
@@ -2204,18 +2201,16 @@ public class CommonConfig {
}
public int getPipeSinkRequestSliceThresholdBytes() {
- return pipeConnectorRequestSliceThresholdBytes;
+ return pipeSinkRequestSliceThresholdBytes;
}
- public void setPipeConnectorRequestSliceThresholdBytes(
- int pipeConnectorRequestSliceThresholdBytes) {
- if (this.pipeConnectorRequestSliceThresholdBytes ==
pipeConnectorRequestSliceThresholdBytes) {
+ public void setPipeSinkRequestSliceThresholdBytes(int
pipeSinkRequestSliceThresholdBytes) {
+ if (this.pipeSinkRequestSliceThresholdBytes ==
pipeSinkRequestSliceThresholdBytes) {
return;
}
- this.pipeConnectorRequestSliceThresholdBytes =
pipeConnectorRequestSliceThresholdBytes;
+ this.pipeSinkRequestSliceThresholdBytes =
pipeSinkRequestSliceThresholdBytes;
logger.info(
- "pipeConnectorRequestSliceThresholdBytes is set to {}",
- pipeConnectorRequestSliceThresholdBytes);
+ "pipeConnectorRequestSliceThresholdBytes is set to {}",
pipeSinkRequestSliceThresholdBytes);
}
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8dcbce35094..d0c37a50367 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -443,7 +443,7 @@ public class PipeDescriptor {
"rate_limiter_hot_reload_check_interval_ms",
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
- config.setPipeConnectorRequestSliceThresholdBytes(
+ config.setPipeSinkRequestSliceThresholdBytes(
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
@@ -612,7 +612,7 @@ public class PipeDescriptor {
"pipe_async_connector_selector_number",
isHotModify);
if (value != null) {
- config.setPipeAsyncConnectorSelectorNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkSelectorNumber(Integer.parseInt(value));
}
value =
@@ -622,7 +622,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_client_number",
isHotModify);
if (value != null) {
- config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkMaxClientNumber(Integer.parseInt(value));
}
value =
@@ -632,7 +632,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_tsfile_client_number",
isHotModify);
if (value != null) {
-
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkMaxTsFileClientNumber(Integer.parseInt(value));
}
value =