This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new bba1af2efb2 Pipe: Fix startup failure of
PipeHistoricalDataRegionTsFileExtractor due to unprepared StorageEngine
(#13526) (#13540)
bba1af2efb2 is described below
commit bba1af2efb270750c2f8f8bdec4613abe0cfe666
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 18 20:06:52 2024 +0800
Pipe: Fix startup failure of PipeHistoricalDataRegionTsFileExtractor due to
unprepared StorageEngine (#13526) (#13540)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit 07f14753418def64d38228f0a16499a7d7408c3b)
Co-authored-by: Zhenyu Luo <[email protected]>
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 23 +++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 310ed2640d3..98280f8b9ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -120,6 +120,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
private boolean isTerminateSignalSent = false;
+ private volatile boolean hasBeenStarted = false;
+
private Queue<TsFileResource> pendingQueue;
@Override
@@ -369,12 +371,18 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
@Override
public synchronized void start() {
- if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ if (!shouldExtractInsertion) {
+ hasBeenStarted = true;
return;
}
- if (!shouldExtractInsertion) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ LOGGER.info(
+ "Pipe {}@{}: failed to start to extract historical TsFile, storage
engine is not ready. Will retry later.",
+ pipeName,
+ dataRegionId);
return;
}
+ hasBeenStarted = true;
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
@@ -570,6 +578,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
@Override
public synchronized Event supply() {
+ if (!hasBeenStarted &&
StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ start();
+ }
+
if (Objects.isNull(pendingQueue)) {
return null;
}
@@ -639,9 +651,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
public synchronized boolean hasConsumedAll() {
// If the pendingQueue is null when the function is called, it implies
that the extractor only
// extracts deletion thus the historical event has nothing to consume.
- return Objects.isNull(pendingQueue)
- || pendingQueue.isEmpty()
- && (!shouldTerminatePipeOnAllHistoricalEventsConsumed ||
isTerminateSignalSent);
+ return hasBeenStarted
+ && (Objects.isNull(pendingQueue)
+ || pendingQueue.isEmpty()
+ && (!shouldTerminatePipeOnAllHistoricalEventsConsumed ||
isTerminateSignalSent));
}
@Override