This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9f9bf13 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed 9f9bf13 is described below commit 9f9bf13830763728f223550f1d83debcb23b83fd Author: LantaoJin <jinlan...@gmail.com> AuthorDate: Sun Jun 30 15:14:41 2019 -0500 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <jinlan...@gmail.com> Co-authored-by: lajin <la...@ebay.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> (cherry picked from commit 0e421000e0ea2c090b6fab0201a6046afceec132) Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../org/apache/spark/network/client/TransportClient.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 20d840b..b018197 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -237,11 +237,16 @@ public class TransportClient implements Closeable { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { - ByteBuffer copy = ByteBuffer.allocate(response.remaining()); - copy.put(response); - // flip "copy" to make it readable - copy.flip(); - result.set(copy); + try { + ByteBuffer copy = ByteBuffer.allocate(response.remaining()); + copy.put(response); + // flip "copy" to make it readable + copy.flip(); + result.set(copy); + } catch (Throwable t) { + logger.warn("Error in responding PRC callback", t); + result.setException(t); + } } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org