This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new af62ace9b0 DRILL-8490: Sender operator fake memory leak result to sql
failed and memory statistics error when ChannelClosedException (#2917)
af62ace9b0 is described below
commit af62ace9b01479e09569f74f9b36442c2e1d3e83
Author: shfshihuafeng <[email protected]>
AuthorDate: Fri Jul 5 20:16:11 2024 +0800
DRILL-8490: Sender operator fake memory leak result to sql failed and
memory statistics error when ChannelClosedException (#2917)
---
.../drill/exec/ops/AccountingDataTunnel.java | 1 +
.../drill/exec/ops/DataTunnelStatusHandler.java | 5 ++++
.../apache/drill/exec/ops/SendingAccountor.java | 33 +++++++++++++++++++++-
.../org/apache/drill/exec/rpc/data/DataTunnel.java | 7 ++++-
.../drill/exec/rpc/AbstractRemoteConnection.java | 2 +-
.../org/apache/drill/exec/rpc/RequestIdMap.java | 1 +
.../java/org/apache/drill/exec/rpc/RpcBus.java | 1 +
.../apache/drill/exec/rpc/RpcOutcomeListener.java | 6 ++++
8 files changed, 53 insertions(+), 3 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 67d78e56e7..61ef523979 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -43,6 +43,7 @@ public class AccountingDataTunnel {
public void sendRecordBatch(FragmentWritableBatch batch) {
sendingAccountor.increment();
+ sendingAccountor.incrementComplete();
tunnel.sendRecordBatch(statusHandler, batch);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
index e78cda9612..15bb78c6a1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
@@ -62,4 +62,9 @@ public class DataTunnelStatusHandler implements
RpcOutcomeListener<BitData.AckWi
sendingAccountor.decrement();
consumer.interrupt(e);
}
+
+ @Override
+ public void complete() {
+ sendingAccountor.decrementComplete();
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index d588b602ad..656b1aaad1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -35,6 +35,9 @@ public class SendingAccountor {
private final AtomicInteger batchesSent = new AtomicInteger(0);
private final Semaphore wait = new Semaphore(0);
+ //batch release util data send complete
+ private final AtomicInteger batchesComplete = new AtomicInteger(0);
+ private final Semaphore waitComplete = new Semaphore(0);
void increment() {
batchesSent.incrementAndGet();
@@ -44,6 +47,14 @@ public class SendingAccountor {
wait.release();
}
+ void incrementComplete() {
+ batchesComplete.incrementAndGet();
+ }
+
+ void decrementComplete() {
+ waitComplete.release();
+ }
+
public synchronized void waitForSendComplete() {
int waitForBatches = batchesSent.get();
boolean isInterrupted = false;
@@ -61,11 +72,31 @@ public class SendingAccountor {
isInterrupted = true;
}
}
-
+ isInterrupted = isInterrupted || waitForOperatorComplete();
if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code
higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
}
+
+ public synchronized boolean waitForOperatorComplete() {
+ int waitForBatches = batchesComplete.get();
+ boolean isInterrupted = false;
+ while(waitForBatches != 0) {
+ try {
+ waitComplete.acquire(waitForBatches);
+ waitForBatches = batchesComplete.addAndGet(-1 * waitForBatches);
+ } catch (InterruptedException e) {
+ // We should always wait for send complete. If we don't, we'll leak
memory or have a memory miss when we try
+ // to send. This should be safe because: network connections should
get disconnected and fail a send if a
+ // node goes down, otherwise, the receiving side connection should
always consume from the rpc layer
+ // (blocking is cooperative and will get triggered before this)
+ logger.warn("Interrupted while waiting for send complete. Continuing
to wait.", e);
+
+ isInterrupted = true;
+ }
+ }
+ return isInterrupted;
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index f60d668076..da0601fd12 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -84,7 +84,7 @@ public class DataTunnel {
}
outcomeListener.interrupted(e);
-
+ outcomeListener.complete();
// Preserve evidence that the interruption occurred so that code higher
up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
@@ -141,6 +141,11 @@ public class DataTunnel {
sendingSemaphore.release();
inner.interrupted(e);
}
+
+ @Override
+ public void complete() {
+ inner.complete();
+ }
}
private class SendBatchAsyncListen extends
ListeningCommand<BitData.AckWithCredit, DataClientConnection, RpcType,
MessageLite> {
diff --git
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
index 6e11236fc4..c02c1ed9ba 100644
---
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
+++
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
@@ -77,7 +77,7 @@ public abstract class AbstractRemoteConnection implements
RemoteConnection, Encr
return true;
} catch (final InterruptedException e) {
listener.interrupted(e);
-
+ listener.complete();
// Preserve evidence that the interruption occurred so that code higher
up
// on the call stack can learn of the
// interruption and respond to it if it wants to.
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index 11b829ef20..cc38cc0037 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -115,6 +115,7 @@ class RequestIdMap {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
+ handler.complete();
if (!future.isSuccess()) {
try {
removeFromMap(coordinationId);
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 69c1bb3311..aa85dc44d5 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -120,6 +120,7 @@ public abstract class RpcBus<T extends EnumLite, C extends
RemoteConnection> imp
completed = true;
} catch (Exception | AssertionError e) {
listener.failed(new RpcException("Failure sending message.", e));
+ listener.complete();
} finally {
if (!completed) {
diff --git
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 4afa159846..953c56bc4f 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -34,4 +34,10 @@ public interface RpcOutcomeListener<V> {
* is cancelled due to query cancellations or failures.
*/
void interrupted(final InterruptedException e);
+
+ /**
+ * Called when an operator complete for waiting msg release
+ */
+ default void complete() {
+ }
}