This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9579c9a1da9 [IOTDB-5942] Pipe: Fix bugs in PipeWALResourceManager, 
EnrichedEvent, IoTDBThriftReceiverV1 (#9993)
9579c9a1da9 is described below

commit 9579c9a1da9ff48163654e1519117dffdcee5ba7
Author: yschengzi <[email protected]>
AuthorDate: Wed May 31 10:00:30 2023 +0800

    [IOTDB-5942] Pipe: Fix bugs in PipeWALResourceManager, EnrichedEvent, 
IoTDBThriftReceiverV1 (#9993)
    
    * fix bugs in PipeWALResourceManager: Concurrent modification exception 
when check WAL TTL
    
    * fix bugs in EnrichedEvent: PipeRealtimeCollectEvent reference count error
    
    * fix bugs in IoTDBThriftReceiverV1: Writer stream closed because of tsfile 
with the same name
    
    * add log when reporting PipeException
---
 .../iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java  |  5 +++++
 .../PipeRealtimeDataRegionHybridCollector.java         |  2 ++
 .../realtime/PipeRealtimeDataRegionLogCollector.java   |  1 +
 .../PipeRealtimeDataRegionTsFileCollector.java         |  1 +
 .../connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java | 12 +++++++++++-
 .../apache/iotdb/db/pipe/core/event/EnrichedEvent.java |  5 ++---
 .../core/event/realtime/PipeRealtimeCollectEvent.java  | 18 ++++++++++++++++++
 .../db/pipe/resource/wal/PipeWALResourceManager.java   | 12 ++++++++----
 .../db/pipe/task/subtask/PipeConnectorSubtask.java     |  2 +-
 .../apache/iotdb/db/pipe/task/subtask/PipeSubtask.java |  2 +-
 10 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 155653dc877..884cecf4a74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -93,6 +93,11 @@ public class PipeRuntimeAgent implements IService {
   //////////////////////////// Runtime Exception Handlers 
////////////////////////////
 
   public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException 
pipeRuntimeException) {
+    LOGGER.warn(
+        String.format(
+            "PipeRuntimeException: pipe task meta %s, exception %s",
+            pipeTaskMeta, pipeRuntimeException),
+        pipeRuntimeException);
     pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 7ca784dd3ce..872b9c56f07 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -162,6 +162,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
         // this event is not reliable anymore. but the data represented by 
this event
         // has been carried by the following tsfile event, so we can just 
discard this event.
         event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+        LOGGER.warn(String.format("Increase reference count for event %s 
error.", event));
         return null;
       }
     }
@@ -196,6 +197,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
                 "TsFile Event %s can not be supplied because the reference 
count can not be increased, "
                     + "the data represented by this event is lost",
                 event.getEvent());
+        LOGGER.warn(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
         return null;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 2d5150bcd80..c7c96b650e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -94,6 +94,7 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
                 "Tablet Event %s can not be supplied because the reference 
count can not be increased, "
                     + "the data represented by this event is lost",
                 collectEvent.getEvent());
+        LOGGER.warn(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 4b2849f0243..42bec421eed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -95,6 +95,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
                 "TsFile Event %s can not be supplied because the reference 
count can not be increased, "
                     + "the data represented by this event is lost",
                 collectEvent.getEvent());
+        LOGGER.warn(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index 025025e1c03..ea1f221d035 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -228,9 +228,19 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
                     req.getFileLength(), writingFileWriter.length())));
       }
 
+      final LoadTsFileStatement statement = new 
LoadTsFileStatement(writingFile.getAbsolutePath());
+
+      // 1.The writing file writer must be closed, otherwise it may cause 
concurrent errors during
+      // the process of loading tsfile when parsing tsfile.
+      //
+      // 2.The writing file must be set to null, otherwise if the next passed 
tsfile has the same
+      // name as the current tsfile, it will bypass the judgment logic of
+      // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue 
to write to the already
+      // loaded file. Since the writing file writer has already been closed, 
it will throw a Stream
+      // Close exception.
       writingFileWriter.close();
+      writingFile = null;
 
-      final LoadTsFileStatement statement = new 
LoadTsFileStatement(writingFile.getAbsolutePath());
       statement.setDeleteAfterLoad(true);
       statement.setVerifySchema(true);
       statement.setAutoCreateDatabase(false);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fc15200b603..28e611bd301 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * additional information mainly includes the reference count of the event.
  */
 public abstract class EnrichedEvent implements Event {
-
   private final AtomicInteger referenceCount;
 
   private final PipeTaskMeta pipeTaskMeta;
@@ -47,7 +46,7 @@ public abstract class EnrichedEvent implements Event {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is increased successfully, false if 
the event is not
    */
-  public final boolean increaseReferenceCount(String holderMessage) {
+  public boolean increaseReferenceCount(String holderMessage) {
     boolean isSuccessful = true;
     synchronized (this) {
       if (referenceCount.get() == 0) {
@@ -75,7 +74,7 @@ public abstract class EnrichedEvent implements Event {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false 
otherwise
    */
-  public final boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseReferenceCount(String holderMessage) {
     boolean isSuccessful = true;
     synchronized (this) {
       if (referenceCount.get() == 1) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index e9887cc88bf..15479bc3788 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -65,11 +65,29 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
     device2Measurements = null;
   }
 
+  @Override
+  public boolean increaseReferenceCount(String holderMessage) {
+    // This method must be overridden, otherwise during the real-time data 
collection stage, the
+    // current PipeRealtimeCollectEvent rather than the member variable 
EnrichedEvent will increase
+    // the reference count, resulting in errors in the reference count of the 
EnrichedEvent
+    // contained in this PipeRealtimeCollectEvent during the processor and 
connector stages.
+    return event.increaseReferenceCount(holderMessage);
+  }
+
   @Override
   public boolean increaseResourceReferenceCount(String holderMessage) {
     return event.increaseResourceReferenceCount(holderMessage);
   }
 
+  @Override
+  public boolean decreaseReferenceCount(String holderMessage) {
+    // This method must be overridden, otherwise during the real-time data 
collection stage, the
+    // current PipeRealtimeCollectEvent rather than the member variable 
EnrichedEvent will increase
+    // the reference count, resulting in errors in the reference count of the 
EnrichedEvent
+    // contained in this PipeRealtimeCollectEvent during the processor and 
connector stages.
+    return event.decreaseReferenceCount(holderMessage);
+  }
+
   @Override
   public boolean decreaseResourceReferenceCount(String holderMessage) {
     return event.decreaseResourceReferenceCount(holderMessage);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 18284de32f4..80eb6c37ffa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -6,6 +6,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -36,14 +37,17 @@ public class PipeWALResourceManager implements 
AutoCloseable {
         ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
             PIPE_WAL_RESOURCE_TTL_CHECKER,
             () -> {
-              for (final long memtableId : 
memtableIdToPipeWALResourceMap.keySet()) {
+              Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+                  memtableIdToPipeWALResourceMap.entrySet().iterator();
+              while (iterator.hasNext()) {
+                final Map.Entry<Long, PipeWALResource> entry = iterator.next();
                 final ReentrantLock lock =
-                    memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+                    memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
 
                 lock.lock();
                 try {
-                  if 
(memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
-                    memtableIdToPipeWALResourceMap.remove(memtableId);
+                  if (entry.getValue().invalidateIfPossible()) {
+                    iterator.remove();
                   }
                 } finally {
                   lock.unlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 4d7ef849fd6..84a4817d733 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -126,7 +126,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
             String.format(
                 "Failed to reconnect to the target system after %d times, 
stopping current pipe task %s...",
                 MAX_RETRY_TIMES, taskID);
-        LOGGER.error(errorMessage);
+        LOGGER.warn(errorMessage);
         lastFailedCause = throwable;
 
         PipeAgent.runtime().report(taskMeta, new 
PipeRuntimeCriticalException(errorMessage));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 85fb197bbef..b1331f4df48 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -169,7 +169,7 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
   protected void releaseLastEvent() {
     if (lastEvent != null) {
       if (lastEvent instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
lastEvent).decreaseReferenceCount(PipeSubtask.class.getName());
+        ((EnrichedEvent) 
lastEvent).decreaseReferenceCount(this.getClass().getName());
       }
       lastEvent = null;
     }

Reply via email to