This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9f9bf13  [SPARK-28160][CORE] Fix a bug that callback function may hang 
when unchecked exception missed
9f9bf13 is described below

commit 9f9bf13830763728f223550f1d83debcb23b83fd
Author: LantaoJin <jinlan...@gmail.com>
AuthorDate: Sun Jun 30 15:14:41 2019 -0500

    [SPARK-28160][CORE] Fix a bug that callback function may hang when 
unchecked exception missed
    
    This is very like #23590 .
    
    `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is 
large but no enough memory is available. However, when this happens, 
`TransportClient.sendRpcSync` will just hang forever if the timeout set to 
unlimited.
    
    This PR catches `Throwable` and uses the error to complete `SettableFuture`.
    
    I tested in my IDE by setting the value of size to -1 to verify the result. 
Without this patch, it won't be finished until timeout (May hang forever if 
timeout set to MAX_INT), or the expected `IllegalArgumentException` will be 
caught.
    ```java
    Override
          public void onSuccess(ByteBuffer response) {
            try {
              int size = response.remaining();
              ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in 
runtime when debug
              copy.put(response);
              // flip "copy" to make it readable
              copy.flip();
              result.set(copy);
            } catch (Throwable t) {
              result.setException(t);
            }
          }
    ```
    
    Closes #24964 from LantaoJin/SPARK-28160.
    
    Lead-authored-by: LantaoJin <jinlan...@gmail.com>
    Co-authored-by: lajin <la...@ebay.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
    (cherry picked from commit 0e421000e0ea2c090b6fab0201a6046afceec132)
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/network/client/TransportClient.java  | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 20d840b..b018197 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -237,11 +237,16 @@ public class TransportClient implements Closeable {
     sendRpc(message, new RpcResponseCallback() {
       @Override
       public void onSuccess(ByteBuffer response) {
-        ByteBuffer copy = ByteBuffer.allocate(response.remaining());
-        copy.put(response);
-        // flip "copy" to make it readable
-        copy.flip();
-        result.set(copy);
+        try {
+          ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+          copy.put(response);
+          // flip "copy" to make it readable
+          copy.flip();
+          result.set(copy);
+        } catch (Throwable t) {
+          logger.warn("Error in responding PRC callback", t);
+          result.setException(t);
+        }
       }
 
       @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to