This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new a8940b691ab Pipe: Fix infinite loop when thread is interrupted in
invoking PipeMemoryBlock#close & Avoid throwing new InterruptedException in
conditions that can be self-restoring (#14471) (#14486)
a8940b691ab is described below
commit a8940b691ab4ed4b8f3a2e19898caf1a72ac803d
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Dec 18 18:57:55 2024 +0800
Pipe: Fix infinite loop when thread is interrupted in invoking
PipeMemoryBlock#close & Avoid throwing new InterruptedException in conditions
that can be self-restoring (#14471) (#14486)
---
.../event/common/tsfile/PipeTsFileInsertionEvent.java | 17 ++++++++++++-----
.../iotdb/db/pipe/resource/memory/PipeMemoryBlock.java | 14 +++++++++++++-
.../event/response/SubscriptionEventTsFileResponse.java | 2 +-
3 files changed, 26 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d29afdbee83..71a21eb5f05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -414,13 +414,20 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
}
waitForResourceEnough4Parsing(timeoutMs);
return initDataContainer().toTabletInsertionEvents();
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (final Exception e) {
close();
+ // close() should be called before re-interrupting the thread
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+
final String errorMsg =
- String.format(
- "Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
+ e instanceof InterruptedException
+ ? String.format(
+ "Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath())
+ : String.format(
+ "Parse TsFile %s error. Because: %s",
resource.getTsFilePath(), e.getMessage());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
}
@@ -458,7 +465,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
- throw new InterruptedException(
+ throw new PipeException(
String.format("TimeoutException: Waited %s seconds",
waitTimeSeconds));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 8ebe90b388c..07f5b904523 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -170,20 +170,32 @@ public class PipeMemoryBlock implements AutoCloseable {
@Override
public void close() {
+ boolean isInterrupted = false;
+
while (true) {
try {
if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
try {
pipeMemoryManager.release(this);
+ if (isInterrupted) {
+ LOGGER.warn("{} is released after thread interruption.", this);
+ }
break;
} finally {
lock.unlock();
}
}
} catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
+ // Each time the close task is run, it means that the interrupt status
left by the previous
+ // tryLock does not need to be retained. Otherwise, it will lead to an
infinite loop.
+ isInterrupted = true;
LOGGER.warn("Interrupted while waiting for the lock.", e);
}
}
+
+ // Restore the interrupt status of the current thread
+ if (isInterrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 6c102c21912..2397ec51c60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -234,7 +234,7 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
// see
org.apache.iotdb.rpc.subscription.exception.SubscriptionTimeoutException.KEYWORD
- throw new InterruptedException(
+ throw new SubscriptionException(
String.format("TimeoutException: Waited %s seconds",
waitTimeSeconds));
}
}