This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9579c9a1da9 [IOTDB-5942] Pipe: Fix bugs in PipeWALResourceManager,
EnrichedEvent, IoTDBThriftReceiverV1 (#9993)
9579c9a1da9 is described below
commit 9579c9a1da9ff48163654e1519117dffdcee5ba7
Author: yschengzi <[email protected]>
AuthorDate: Wed May 31 10:00:30 2023 +0800
[IOTDB-5942] Pipe: Fix bugs in PipeWALResourceManager, EnrichedEvent,
IoTDBThriftReceiverV1 (#9993)
* fix bugs in PipeWALResourceManager: Concurrent modification exception
when check WAL TTL
* fix bugs in EnrichedEvent: PipeRealtimeCollectEvent reference count error
* fix bugs in IoTDBThriftReceiverV1: Writer stream closed because of tsfile
with the same name
* add log when reporting PipeException
---
.../iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java | 5 +++++
.../PipeRealtimeDataRegionHybridCollector.java | 2 ++
.../realtime/PipeRealtimeDataRegionLogCollector.java | 1 +
.../PipeRealtimeDataRegionTsFileCollector.java | 1 +
.../connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java | 12 +++++++++++-
.../apache/iotdb/db/pipe/core/event/EnrichedEvent.java | 5 ++---
.../core/event/realtime/PipeRealtimeCollectEvent.java | 18 ++++++++++++++++++
.../db/pipe/resource/wal/PipeWALResourceManager.java | 12 ++++++++----
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 2 +-
.../apache/iotdb/db/pipe/task/subtask/PipeSubtask.java | 2 +-
10 files changed, 50 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 155653dc877..884cecf4a74 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -93,6 +93,11 @@ public class PipeRuntimeAgent implements IService {
//////////////////////////// Runtime Exception Handlers
////////////////////////////
public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException
pipeRuntimeException) {
+ LOGGER.warn(
+ String.format(
+ "PipeRuntimeException: pipe task meta %s, exception %s",
+ pipeTaskMeta, pipeRuntimeException),
+ pipeRuntimeException);
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 7ca784dd3ce..872b9c56f07 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -162,6 +162,7 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
// this event is not reliable anymore. but the data represented by
this event
// has been carried by the following tsfile event, so we can just
discard this event.
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
+ LOGGER.warn(String.format("Increase reference count for event %s
error.", event));
return null;
}
}
@@ -196,6 +197,7 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
"TsFile Event %s can not be supplied because the reference
count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
+ LOGGER.warn(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
return null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 2d5150bcd80..c7c96b650e4 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -94,6 +94,7 @@ public class PipeRealtimeDataRegionLogCollector extends
PipeRealtimeDataRegionCo
"Tablet Event %s can not be supplied because the reference
count can not be increased, "
+ "the data represented by this event is lost",
collectEvent.getEvent());
+ LOGGER.warn(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 4b2849f0243..42bec421eed 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -95,6 +95,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends
PipeRealtimeDataRegio
"TsFile Event %s can not be supplied because the reference
count can not be increased, "
+ "the data represented by this event is lost",
collectEvent.getEvent());
+ LOGGER.warn(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index 025025e1c03..ea1f221d035 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -228,9 +228,19 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
req.getFileLength(), writingFileWriter.length())));
}
+ final LoadTsFileStatement statement = new
LoadTsFileStatement(writingFile.getAbsolutePath());
+
+ // 1.The writing file writer must be closed, otherwise it may cause
concurrent errors during
+ // the process of loading tsfile when parsing tsfile.
+ //
+ // 2.The writing file must be set to null, otherwise if the next passed
tsfile has the same
+ // name as the current tsfile, it will bypass the judgment logic of
+ // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue
to write to the already
+ // loaded file. Since the writing file writer has already been closed,
it will throw a Stream
+ // Close exception.
writingFileWriter.close();
+ writingFile = null;
- final LoadTsFileStatement statement = new
LoadTsFileStatement(writingFile.getAbsolutePath());
statement.setDeleteAfterLoad(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fc15200b603..28e611bd301 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* additional information mainly includes the reference count of the event.
*/
public abstract class EnrichedEvent implements Event {
-
private final AtomicInteger referenceCount;
private final PipeTaskMeta pipeTaskMeta;
@@ -47,7 +46,7 @@ public abstract class EnrichedEvent implements Event {
* @param holderMessage the message of the invoker
* @return true if the reference count is increased successfully, false if
the event is not
*/
- public final boolean increaseReferenceCount(String holderMessage) {
+ public boolean increaseReferenceCount(String holderMessage) {
boolean isSuccessful = true;
synchronized (this) {
if (referenceCount.get() == 0) {
@@ -75,7 +74,7 @@ public abstract class EnrichedEvent implements Event {
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false
otherwise
*/
- public final boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseReferenceCount(String holderMessage) {
boolean isSuccessful = true;
synchronized (this) {
if (referenceCount.get() == 1) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index e9887cc88bf..15479bc3788 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -65,11 +65,29 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent
{
device2Measurements = null;
}
+ @Override
+ public boolean increaseReferenceCount(String holderMessage) {
+ // This method must be overridden, otherwise during the real-time data
collection stage, the
+ // current PipeRealtimeCollectEvent rather than the member variable
EnrichedEvent will increase
+ // the reference count, resulting in errors in the reference count of the
EnrichedEvent
+ // contained in this PipeRealtimeCollectEvent during the processor and
connector stages.
+ return event.increaseReferenceCount(holderMessage);
+ }
+
@Override
public boolean increaseResourceReferenceCount(String holderMessage) {
return event.increaseResourceReferenceCount(holderMessage);
}
+ @Override
+ public boolean decreaseReferenceCount(String holderMessage) {
+ // This method must be overridden, otherwise during the real-time data
collection stage, the
+ // current PipeRealtimeCollectEvent rather than the member variable
EnrichedEvent will increase
+ // the reference count, resulting in errors in the reference count of the
EnrichedEvent
+ // contained in this PipeRealtimeCollectEvent during the processor and
connector stages.
+ return event.decreaseReferenceCount(holderMessage);
+ }
+
@Override
public boolean decreaseResourceReferenceCount(String holderMessage) {
return event.decreaseResourceReferenceCount(holderMessage);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 18284de32f4..80eb6c37ffa 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -6,6 +6,7 @@ import
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -36,14 +37,17 @@ public class PipeWALResourceManager implements
AutoCloseable {
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
PIPE_WAL_RESOURCE_TTL_CHECKER,
() -> {
- for (final long memtableId :
memtableIdToPipeWALResourceMap.keySet()) {
+ Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+ memtableIdToPipeWALResourceMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<Long, PipeWALResource> entry = iterator.next();
final ReentrantLock lock =
- memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
+ memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
- if
(memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
- memtableIdToPipeWALResourceMap.remove(memtableId);
+ if (entry.getValue().invalidateIfPossible()) {
+ iterator.remove();
}
} finally {
lock.unlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 4d7ef849fd6..84a4817d733 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -126,7 +126,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
String.format(
"Failed to reconnect to the target system after %d times,
stopping current pipe task %s...",
MAX_RETRY_TIMES, taskID);
- LOGGER.error(errorMessage);
+ LOGGER.warn(errorMessage);
lastFailedCause = throwable;
PipeAgent.runtime().report(taskMeta, new
PipeRuntimeCriticalException(errorMessage));
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 85fb197bbef..b1331f4df48 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -169,7 +169,7 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
protected void releaseLastEvent() {
if (lastEvent != null) {
if (lastEvent instanceof EnrichedEvent) {
- ((EnrichedEvent)
lastEvent).decreaseReferenceCount(PipeSubtask.class.getName());
+ ((EnrichedEvent)
lastEvent).decreaseReferenceCount(this.getClass().getName());
}
lastEvent = null;
}