This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d445f92a1e Pipe: Degraded the lock in PipeEventCollector to avoid 
waitForTsFileClose() blocking pipe drop (#12518)
6d445f92a1e is described below

commit 6d445f92a1ea31f8aca183f3ba6c5d6cdfea48c6
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 19:51:03 2024 +0800

    Pipe: Degraded the lock in PipeEventCollector to avoid waitForTsFileClose() 
blocking pipe drop (#12518)
---
 .../db/pipe/task/connection/PipeEventCollector.java | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c7e34b53aa6..38779d44bb0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -55,7 +55,9 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
 
   public PipeEventCollector(
-      BoundedBlockingPendingQueue<Event> pendingQueue, long creationTime, int 
regionId) {
+      final BoundedBlockingPendingQueue<Event> pendingQueue,
+      final long creationTime,
+      final int regionId) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
@@ -63,7 +65,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
   }
 
   @Override
-  public synchronized void collect(Event event) {
+  public void collect(final Event event) {
     try {
       if (event instanceof PipeInsertNodeTabletInsertionEvent) {
         parseAndCollectEvent((PipeInsertNodeTabletInsertionEvent) event);
@@ -74,16 +76,17 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
       } else {
         collectEvent(event);
       }
-    } catch (PipeException e) {
+    } catch (final PipeException e) {
       throw e;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeException("Error occurred when collecting events from 
processor.", e);
     }
   }
 
-  private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent 
sourceEvent) {
+  private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent 
sourceEvent) {
     if (sourceEvent.shouldParseTimeOrPattern()) {
-      for (PipeRawTabletInsertionEvent parsedEvent : 
sourceEvent.toRawTabletInsertionEvents()) {
+      for (final PipeRawTabletInsertionEvent parsedEvent :
+          sourceEvent.toRawTabletInsertionEvents()) {
         collectEvent(parsedEvent);
       }
     } else {
@@ -91,7 +94,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     }
   }
 
-  private void parseAndCollectEvent(PipeRawTabletInsertionEvent sourceEvent) {
+  private void parseAndCollectEvent(final PipeRawTabletInsertionEvent 
sourceEvent) {
     if (sourceEvent.shouldParseTimeOrPattern()) {
       final PipeRawTabletInsertionEvent parsedEvent = 
sourceEvent.parseEventWithPatternOrTime();
       if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
@@ -102,7 +105,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     }
   }
 
-  private void parseAndCollectEvent(PipeTsFileInsertionEvent sourceEvent) 
throws Exception {
+  private void parseAndCollectEvent(final PipeTsFileInsertionEvent 
sourceEvent) throws Exception {
     if (!sourceEvent.waitForTsFileClose()) {
       LOGGER.warn(
           "Pipe skipping temporary TsFile which shouldn't be transferred: {}",
@@ -124,7 +127,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     }
   }
 
-  private void collectEvent(Event event) {
+  private synchronized void collectEvent(final Event event) {
     collectInvocationCount.incrementAndGet();
 
     if (event instanceof EnrichedEvent) {

Reply via email to