This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f68c49e349b Pipe: Fixed the bug that lower version tablet may cause
NPE when sent to 2.x version & The temporary exception may be wrongly reported
(#16843)
f68c49e349b is described below
commit f68c49e349b2974d812e56bf5fbc22dd646e7eb6
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 2 14:14:30 2025 +0800
Pipe: Fixed the bug that lower version tablet may cause NPE when sent to
2.x version & The temporary exception may be wrongly reported (#16843)
* older_version_compatibility
* protect
* dependency
---
.../db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java | 7 +++++++
.../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 2 +-
.../iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java | 5 +++++
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index a574712cbbd..31ffcef042f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,6 +220,12 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
e);
return false;
} catch (final Exception e) {
+ if (ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
+ LOGGER.info(
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
+ e);
+ return false;
+ }
if (!isClosed.get()) {
throw new PipeException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 2a1ab3f5a35..9db3cb57e6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -468,7 +468,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
}
}
- } catch (final AccessDeniedException e) {
+ } catch (final AccessDeniedException |
PipeRuntimeOutOfMemoryCriticalException e) {
throw e;
} catch (final Exception e) {
if (e instanceof InterruptedException) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
index c9857c9eaba..4ad64ae278b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.time.LocalDate;
+import java.util.Objects;
public class PipeTabletEventSorter {
@@ -84,6 +85,10 @@ public class PipeTabletEventSorter {
final TSDataType dataType,
final BitMap originalBitMap,
final BitMap deDuplicatedBitMap) {
+ // Older version's sender may contain null values, we need to cover this
case
+ if (Objects.isNull(valueList)) {
+ return null;
+ }
switch (dataType) {
case BOOLEAN:
final boolean[] boolValues = (boolean[]) valueList;