This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new af62ace9b0 DRILL-8490: Sender operator fake memory leak result to sql 
failed  and memory statistics error when ChannelClosedException (#2917)
af62ace9b0 is described below

commit af62ace9b01479e09569f74f9b36442c2e1d3e83
Author: shfshihuafeng <[email protected]>
AuthorDate: Fri Jul 5 20:16:11 2024 +0800

    DRILL-8490: Sender operator fake memory leak result to sql failed  and 
memory statistics error when ChannelClosedException (#2917)
---
 .../drill/exec/ops/AccountingDataTunnel.java       |  1 +
 .../drill/exec/ops/DataTunnelStatusHandler.java    |  5 ++++
 .../apache/drill/exec/ops/SendingAccountor.java    | 33 +++++++++++++++++++++-
 .../org/apache/drill/exec/rpc/data/DataTunnel.java |  7 ++++-
 .../drill/exec/rpc/AbstractRemoteConnection.java   |  2 +-
 .../org/apache/drill/exec/rpc/RequestIdMap.java    |  1 +
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |  1 +
 .../apache/drill/exec/rpc/RpcOutcomeListener.java  |  6 ++++
 8 files changed, 53 insertions(+), 3 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 67d78e56e7..61ef523979 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -43,6 +43,7 @@ public class AccountingDataTunnel {
 
   public void sendRecordBatch(FragmentWritableBatch batch) {
     sendingAccountor.increment();
+    sendingAccountor.incrementComplete();
     tunnel.sendRecordBatch(statusHandler, batch);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
index e78cda9612..15bb78c6a1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
@@ -62,4 +62,9 @@ public class DataTunnelStatusHandler implements 
RpcOutcomeListener<BitData.AckWi
     sendingAccountor.decrement();
     consumer.interrupt(e);
   }
+
+  @Override
+  public void complete() {
+    sendingAccountor.decrementComplete();
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index d588b602ad..656b1aaad1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -35,6 +35,9 @@ public class SendingAccountor {
 
   private final AtomicInteger batchesSent = new AtomicInteger(0);
   private final Semaphore wait = new Semaphore(0);
+  //batch release util data send complete
+  private final AtomicInteger batchesComplete = new AtomicInteger(0);
+  private final Semaphore waitComplete = new Semaphore(0);
 
   void increment() {
     batchesSent.incrementAndGet();
@@ -44,6 +47,14 @@ public class SendingAccountor {
     wait.release();
   }
 
+  void incrementComplete() {
+    batchesComplete.incrementAndGet();
+  }
+
+  void decrementComplete() {
+    waitComplete.release();
+  }
+
   public synchronized void waitForSendComplete() {
       int waitForBatches = batchesSent.get();
       boolean isInterrupted = false;
@@ -61,11 +72,31 @@ public class SendingAccountor {
           isInterrupted = true;
         }
       }
-
+      isInterrupted = isInterrupted || waitForOperatorComplete();
       if (isInterrupted) {
         // Preserve evidence that the interruption occurred so that code 
higher up on the call stack can learn of the
         // interruption and respond to it if it wants to.
         Thread.currentThread().interrupt();
       }
   }
+
+  public synchronized boolean waitForOperatorComplete() {
+    int waitForBatches = batchesComplete.get();
+    boolean isInterrupted = false;
+    while(waitForBatches != 0) {
+      try {
+        waitComplete.acquire(waitForBatches);
+        waitForBatches = batchesComplete.addAndGet(-1 * waitForBatches);
+      } catch (InterruptedException e) {
+        // We should always wait for send complete. If we don't, we'll leak 
memory or have a memory miss when we try
+        // to send. This should be safe because: network connections should 
get disconnected and fail a send if a
+        // node goes down, otherwise, the receiving side connection should 
always consume from the rpc layer
+        // (blocking is cooperative and will get triggered before this)
+        logger.warn("Interrupted while waiting for send complete. Continuing 
to wait.", e);
+
+        isInterrupted = true;
+      }
+    }
+    return isInterrupted;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index f60d668076..da0601fd12 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -84,7 +84,7 @@ public class DataTunnel {
       }
 
       outcomeListener.interrupted(e);
-
+      outcomeListener.complete();
       // Preserve evidence that the interruption occurred so that code higher 
up on the call stack can learn of the
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
@@ -141,6 +141,11 @@ public class DataTunnel {
       sendingSemaphore.release();
       inner.interrupted(e);
     }
+
+    @Override
+    public void complete() {
+      inner.complete();
+    }
   }
 
   private class SendBatchAsyncListen extends 
ListeningCommand<BitData.AckWithCredit, DataClientConnection, RpcType, 
MessageLite> {
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
index 6e11236fc4..c02c1ed9ba 100644
--- 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
+++ 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
@@ -77,7 +77,7 @@ public abstract class AbstractRemoteConnection implements 
RemoteConnection, Encr
       return true;
     } catch (final InterruptedException e) {
       listener.interrupted(e);
-
+      listener.complete();
       // Preserve evidence that the interruption occurred so that code higher 
up
       // on the call stack can learn of the
       // interruption and respond to it if it wants to.
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index 11b829ef20..cc38cc0037 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -115,6 +115,7 @@ class RequestIdMap {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
+      handler.complete();
       if (!future.isSuccess()) {
         try {
           removeFromMap(coordinationId);
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 69c1bb3311..aa85dc44d5 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -120,6 +120,7 @@ public abstract class RpcBus<T extends EnumLite, C extends 
RemoteConnection> imp
       completed = true;
     } catch (Exception | AssertionError e) {
       listener.failed(new RpcException("Failure sending message.", e));
+      listener.complete();
     } finally {
 
       if (!completed) {
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 4afa159846..953c56bc4f 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -34,4 +34,10 @@ public interface RpcOutcomeListener<V> {
    * is cancelled due to query cancellations or failures.
    */
   void interrupted(final InterruptedException e);
+
+  /**
+   * Called when an operator complete for waiting msg release
+   */
+  default void complete() {
+  }
 }

Reply via email to