This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6309e693871 [To dev/1.3] Pipe: Cleaned multiple potential problems in
pipe module (#17396) (#17417)
6309e693871 is described below
commit 6309e693871e9f2524b11459c55a313887d9f483
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 09:38:47 2026 +0800
[To dev/1.3] Pipe: Cleaned multiple potential problems in pipe module
(#17396) (#17417)
* Cleaned multiple potential problems in pipe module (#17396)
* fix
* fix
* fix
* fix
* gras-shop
* fix
* spls
* fix
* pipe-dn
* logger-bug
* fix
* fix
---
.../runtime/heartbeat/PipeHeartbeat.java | 2 +-
.../pipe/source/IoTDBConfigRegionSource.java | 6 ++--
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 2 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +-
.../agent/task/connection/PipeEventCollector.java | 2 +-
.../sink/PipeRealtimePriorityBlockingQueue.java | 11 ++----
.../pipe/metric/overview/PipeResourceMetrics.java | 2 +-
.../receiver/PipeDataNodeReceiverMetrics.java | 4 +--
.../processor/aggregate/AggregateProcessor.java | 4 +--
.../resource/memory/PipeDynamicMemoryBlock.java | 4 +--
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +--
.../batch/PipeTabletEventTsFileBatch.java | 9 ++---
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 2 +-
.../websocket/WebSocketConnectorServer.java | 23 ++++++++----
.../PipeRealtimeDataRegionHybridSource.java | 34 ++----------------
.../realtime/PipeRealtimeDataRegionLogSource.java | 42 ++++------------------
.../realtime/PipeRealtimeDataRegionSource.java | 35 ++----------------
.../PipeRealtimeDataRegionTsFileSource.java | 18 ++--------
.../TsFileDeduplicationBlockingPendingQueue.java | 2 +-
.../task/connection/BlockingPendingQueue.java | 21 +----------
.../pipe/datastructure/pattern/PipePattern.java | 13 +++++--
.../pipe/sink/client/IoTDBSyncClientManager.java | 2 +-
22 files changed, 70 insertions(+), 174 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 13a3c4b83d6..e7e9d2cd97d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -55,7 +55,7 @@ public class PipeHeartbeat {
// the final results and namely these dataNodes are omitted in
calculation.
remainingEventCountMap.put(
pipeMeta.getStaticMeta(),
- Objects.nonNull(pipeCompletedListFromAgent)
+ Objects.nonNull(pipeRemainingEventCountListFromAgent)
? pipeRemainingEventCountListFromAgent.get(i)
: 0L);
remainingTimeMap.put(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index a89af14c8c0..4295aa9b16f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -101,8 +101,10 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
@Override
public synchronized EnrichedEvent supply() throws Exception {
final EnrichedEvent event = super.supply();
- PipeEventCommitManager.getInstance()
- .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+ if (Objects.nonNull(event)) {
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+ }
return event;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1e1d3388f75..01289109371 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -78,7 +78,7 @@ class PipeAgentLauncher {
curList.add(uninstalledOrConflictedPipePluginMetaList.get(index +
offset));
offset++;
}
- index += (offset + 1);
+ index += offset;
fetchAndSavePipePluginJars(curList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 64d88d22683..c437507f4b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -722,7 +722,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
needMemory,
freeMemorySizeInBytes,
- freeMemorySizeInBytes,
+ reservedMemorySizeInBytes,
PipeMemoryManager.getTotalMemorySizeInBytes());
LOGGER.warn(message);
throw new PipeException(message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index e3f8203251c..4728f62c94f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -196,7 +196,7 @@ public class PipeEventCollector implements EventCollector {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
- pendingQueue.directOffer(event);
+ pendingQueue.offer(event);
collectInvocationCount.incrementAndGet();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 2baebeedc18..6d227ac31fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -72,7 +72,7 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
@Override
- public boolean directOffer(final Event event) {
+ public boolean offer(final Event event) {
checkBeforeOffer(event);
if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
((EnrichedEvent)
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
return false;
} else {
- return super.directOffer(event);
+ return super.offer(event);
}
}
- @Override
- public boolean waitedOffer(final Event event) {
- return directOffer(event);
- }
-
@Override
public boolean put(final Event event) {
- directOffer(event);
+ offer(event);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 19a8060f89c..48c55fba59d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -137,7 +137,7 @@ public class PipeResourceMetrics implements IMetricSet {
// phantom reference count
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
- metricService.remove(MetricType.RATE,
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
+ metricService.remove(MetricType.COUNTER,
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
}
public void recordDiskIO(final long bytes) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
index 741fa302aec..9c2ecad3cc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
@@ -282,14 +282,14 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
- "handshakeDatanodeV1");
+ "handshakeDataNodeV1");
metricService.remove(
MetricType.TIMER,
Metric.PIPE_DATANODE_RECEIVER.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
- "handshakeDatanodeV2");
+ "handshakeDataNodeV2");
metricService.remove(
MetricType.TIMER,
Metric.PIPE_DATANODE_RECEIVER.toString(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 2b4414ed75d..191ec7ee712 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -728,7 +728,7 @@ public class AggregateProcessor implements PipeProcessor {
throw new UnsupportedOperationException(
String.format(
"The output tablet does not support column type %s",
- valueColumnTypes[rowIndex]));
+ valueColumnTypes[columnIndex]));
}
} else {
bitMaps[columnIndex].mark(rowIndex);
@@ -742,7 +742,7 @@ public class AggregateProcessor implements PipeProcessor {
int filteredCount = 0;
for (int i = 0; i < columnNameStringList.length; ++i) {
if (!bitMaps[i].isAllMarked()) {
- originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
+ originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
index 4e33b871828..3dbac912db3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -44,7 +44,7 @@ public class PipeDynamicMemoryBlock {
PipeDynamicMemoryBlock(
final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long
memoryUsageInBytes) {
- this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+ this.memoryUsageInBytes = Math.max(memoryUsageInBytes, 0);
this.fixedMemoryBlock = fixedMemoryBlock;
}
@@ -116,7 +116,7 @@ public class PipeDynamicMemoryBlock {
if (Double.isNaN(historyMemoryEfficiency)
|| Double.isInfinite(historyMemoryEfficiency)
|| historyMemoryEfficiency < 0.0) {
- currentMemoryEfficiency = 0.0;
+ historyMemoryEfficiency = 0.0;
}
this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 97a4d2621b0..435f890f3a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -465,7 +465,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random()
* clientSize));
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
@@ -486,7 +486,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
long n = 0;
while (true) {
for (final TEndPoint targetNodeUrl : endPointList) {
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index a5af1b68ad4..36ca10daa72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -288,18 +288,19 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
e.getMessage(),
e);
+ final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
currentBatchId.get(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
- sealedFiles.add(fileWriter.getIOWriter().getFile());
+ sealedFiles.add(file);
}
for (final File sealedFile : sealedFiles) {
@@ -309,7 +310,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.getPath(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
}
sealedFiles.clear();
@@ -319,8 +320,8 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
throw e;
}
- fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
+ fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 918cf24fce5..6396c710abd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -168,7 +168,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
- ((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(),
e.getMessage()),
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 940d6ff63db..6da64460495 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,18 @@ public class WebSocketConnectorServer extends
WebSocketServer {
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
eventsWaitingForTransfer.remove(pipeName);
while (!eventTransferQueue.isEmpty()) {
- eventTransferQueue.forEach(
+ final List<EventWaitingForTransfer> eventWrappers;
+ synchronized (eventTransferQueue) {
+ eventWrappers = new ArrayList<>(eventTransferQueue);
+ eventTransferQueue.clear();
+ }
+ eventWrappers.forEach(
(eventWrapper) -> {
if (eventWrapper.event instanceof EnrichedEvent) {
((EnrichedEvent) eventWrapper.event)
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
}
});
- eventTransferQueue.clear();
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
@@ -270,8 +276,10 @@ public class WebSocketConnectorServer extends
WebSocketServer {
LOGGER.warn(
"The tablet of commitId: {} can't be parsed by client, it will be
retried later.", eventId);
- eventTransferQueue.put(
- new EventWaitingForTransfer(eventId, eventWrapper.connector,
eventWrapper.event));
+ synchronized (eventTransferQueue) {
+ eventTransferQueue.put(
+ new EventWaitingForTransfer(eventId, eventWrapper.connector,
eventWrapper.event));
+ }
}
@Override
@@ -321,7 +329,9 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
- queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(),
connector, event));
+ synchronized (queue) {
+ queue.put(new
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
+ }
}
private class TransferThread extends Thread {
@@ -347,7 +357,8 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
try {
- final EventWaitingForTransfer queueElement = queue.take();
+ EventWaitingForTransfer queueElement;
+ queueElement = queue.take();
synchronized (queue) {
queue.notifyAll();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 95e00eb9b15..faabf8b68f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -58,7 +58,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -116,21 +116,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
if (state == TsFileEpoch.State.USING_BOTH) {
event.skipReportOnCommit();
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extractTabletInsertion: pending queue of
PipeRealtimeDataRegionHybridExtractor %s "
- + "has reached capacity, discard tablet event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the tablet event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
@@ -176,21 +162,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extractTsFileInsertion: pending queue of
PipeRealtimeDataRegionHybridExtractor %s "
- + "has reached capacity, discard TsFile event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the tsfile event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index ac7883fd2b0..d3adc4dc3b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -40,7 +40,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
LoggerFactory.getLogger(PipeRealtimeDataRegionLogSource.class);
@Override
- protected void doExtract(PipeRealtimeEvent event) {
+ protected void doExtract(final PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
if (eventToExtract instanceof TabletInsertionEvent) {
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -60,27 +60,12 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
}
}
- private void extractTabletInsertion(PipeRealtimeEvent event) {
+ private void extractTabletInsertion(final PipeRealtimeEvent event) {
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TABLET);
-
- if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
- + "has reached capacity, discard tablet event %s, current
state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
- private void extractTsFileInsertion(PipeRealtimeEvent event) {
+ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) event.getEvent();
if (!(tsFileInsertionEvent.isLoaded())) {
@@ -91,22 +76,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
}
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
-
- if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
- + "has reached capacity, discard loaded tsFile event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b9bc57b977a..b6dcf3f615b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -367,21 +367,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
return;
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- LOGGER.error(
- "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
- + "has reached capacity, discard heartbeat event {}",
- this,
- event);
-
- // Do not report exception since the PipeHeartbeatEvent doesn't affect
- // the correction of pipe progress.
-
- // Ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
@@ -393,24 +379,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
return;
}
- extractDirectly(event);
- }
-
- protected void extractDirectly(final PipeRealtimeEvent event) {
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // Pending is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of %s %s " + "has reached capacity,
discard event %s",
- this.getClass().getSimpleName(), this, event);
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
protected void maySkipIndex4Event(final PipeRealtimeEvent event) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 94e7b549c3f..dd9f8cbbe20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
}
if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
return;
}
@@ -59,21 +59,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
return;
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // Pending is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor
%s "
- + "has reached capacity, discard TsFile event %s, current
state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
event.getTsFileEpoch().clearState(this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index e4a3a545a58..b1677c9ffe6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -65,7 +65,7 @@ public class TsFileDeduplicationBlockingPendingQueue extends
SubscriptionBlockin
@Override
public void directOffer(final Event event) {
- inputPendingQueue.directOffer(event);
+ inputPendingQueue.offer(event);
}
private synchronized Event filter(final Event event) { // make it
synchronized
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index aa93b093254..8773b03f9f3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -50,26 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
this.eventCounter = eventCounter;
}
- public boolean waitedOffer(final E event) {
- checkBeforeOffer(event);
- try {
- final boolean offered =
- pendingQueue.offer(
- event,
-
PIPE_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(),
- TimeUnit.MILLISECONDS);
- if (offered) {
- eventCounter.increaseEventCount(event);
- }
- return offered;
- } catch (final InterruptedException e) {
- LOGGER.info("pending queue offer is interrupted.", e);
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
- public boolean directOffer(final E event) {
+ public boolean offer(final E event) {
checkBeforeOffer(event);
final boolean offered = pendingQueue.offer(event);
if (offered) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index 8c2b4ca0409..c0118397b3c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -328,8 +328,17 @@ public abstract class PipePattern {
final List<PipePattern> sortedPatterns = new ArrayList<>(patterns);
sortedPatterns.sort(
(o1, o2) -> {
- final PartialPath p1 = o1.getBaseInclusionPaths().get(0);
- final PartialPath p2 = o2.getBaseInclusionPaths().get(0);
+ final List<PartialPath> p1List = o1.getBaseInclusionPaths();
+ final List<PartialPath> p2List = o2.getBaseInclusionPaths();
+
+ if (p1List.isEmpty()) {
+ return p2List.isEmpty() ? 1 : -1;
+ }
+
+ // We can only approximate comparison here since TreePattern
represents multiple paths.
+ // We use the first inclusion path as a representative.
+ final PartialPath p1 = p1List.get(0);
+ final PartialPath p2 = p2List.get(0);
final int lenCompare = Integer.compare(p1.getNodeLength(),
p2.getNodeLength());
if (lenCompare != 0) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 969f42bfcec..eb25c7553db 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -361,7 +361,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
- && clientAndStatus.getLeft() != null) {
+ && nextClientAndStatus.getLeft() != null) {
return nextClientAndStatus;
}
}