This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b5626c8922 Pipe: filtered empty tsFiles which should not be parsed or
reported (#12216)
8b5626c8922 is described below
commit 8b5626c8922f536dc9724293628a33ce1d3144fa
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 22 20:59:39 2024 +0800
Pipe: filtered empty tsFiles which should not be parsed or reported (#12216)
---
.../pipe/it/autocreate/IoTDBPipeProcessorIT.java | 9 ++++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 34 +++++++++++++++-------
2 files changed, 29 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index fe12fab2bf1..693af799453 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -70,14 +70,15 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- // Test the mixture of historical and realtime data
+ // Test empty tsFile parsing
+ // Assert that an empty tsFile will not be parsed by the processor then
block
+ // the subsequent data processing
// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
- "insert into root.vehicle.d0(time, s1) values (0, 1)",
- "insert into root.vehicle.d0(time, s1) values (10000, 2)"))) {
+ "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete
from root.**"))) {
return;
}
@@ -110,6 +111,8 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
+ "insert into root.vehicle.d0(time, s1) values (0, 1)",
+ "insert into root.vehicle.d0(time, s1) values (10000, 2)",
"insert into root.vehicle.d0(time, s1) values (19999, 3)",
"insert into root.vehicle.d0(time, s1) values (20000, 4)",
"insert into root.vehicle.d0(time, s1) values (20001, 5)",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 110414a64ba..5e42d7960b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
@@ -156,7 +157,12 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public ProgressIndex getProgressIndex() {
try {
- waitForTsFileClose();
+ if (!waitForTsFileClose()) {
+ LOGGER.warn(
+ "Skipping temporary TsFile {}'s progressIndex, will report
MinimumProgressIndex",
+ tsFile);
+ return MinimumProgressIndex.INSTANCE;
+ }
return resource.getMaxProgressIndexAfterClose();
} catch (InterruptedException e) {
LOGGER.warn(
@@ -217,18 +223,13 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return initDataContainer().toTabletInsertionEvents();
- }
-
- private TsFileInsertionDataContainer initDataContainer() {
try {
- if (dataContainer == null) {
- waitForTsFileClose();
- dataContainer =
- new TsFileInsertionDataContainer(
- tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+ if (!waitForTsFileClose()) {
+ LOGGER.warn(
+ "Pipe skipping temporary TsFile's parsing which shouldn't be
transferred: {}", tsFile);
+ return Collections.emptyList();
}
- return dataContainer;
+ return initDataContainer().toTabletInsertionEvents();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
close();
@@ -238,6 +239,17 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
+ }
+ }
+
+ private TsFileInsertionDataContainer initDataContainer() {
+ try {
+ if (dataContainer == null) {
+ dataContainer =
+ new TsFileInsertionDataContainer(
+ tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+ }
+ return dataContainer;
} catch (IOException e) {
close();