This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e953e48f [subprocess] KUDU-3489: Support reading large messages 
through pipes
2e953e48f is described below

commit 2e953e48f91b2d099f6c6f3f18921c74d2ecfeeb
Author: Abhishek Chennaka <achenn...@cloudera.com>
AuthorDate: Mon Jul 10 11:58:48 2023 -0700

    [subprocess] KUDU-3489: Support reading large messages through pipes
    
    This patch enables the subprocess server to be able to read messages
    larger than 1MB which was otherwise flaky by reading the input stream
    messages until we encounter EOF. This issue is noticed when large sized
    requests are made to the subprocess server and it fails in receiving
    the complete messages.
    
    In addition made a small log change to MessageIO.java to display the
    exception message correctly.
    
    Change-Id: I6523fdaaca19ee089dbac52a7dedec8847926a6c
    Reviewed-on: http://gerrit.cloudera.org:8080/20180
    Reviewed-by: Alexey Serbin <ale...@apache.org>
    Tested-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 .../java/org/apache/kudu/subprocess/MessageIO.java | 51 +++++++++++++---------
 .../org/apache/kudu/subprocess/MessageReader.java  |  6 +--
 .../kudu/subprocess/echo/TestEchoSubprocess.java   |  2 +
 src/kudu/subprocess/subprocess_server-test.cc      | 51 ++++++++++++++++++++--
 4 files changed, 81 insertions(+), 29 deletions(-)

diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
index 538904661..fafa37efc 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
@@ -54,7 +54,6 @@ public class MessageIO {
    * from multiple threads concurrently.
    *
    * @return the message in a byte array.
-   * @throws EOFException if the end of the stream has been reached
    * @throws IOException if this input stream has been closed, an I/O
    *                     error occurs, or fail to read the message
    *                     properly
@@ -62,7 +61,7 @@ public class MessageIO {
    *                                 in the stream
    */
   @VisibleForTesting
-  byte[] readBytes() throws EOFException, IOException {
+  byte[] readBytes() throws IOException {
     Preconditions.checkNotNull(in);
     // Read four bytes of the message to get the size of the body.
     byte[] sizeBytes = new byte[Integer.BYTES];
@@ -85,48 +84,58 @@ public class MessageIO {
   /**
    * Reads <code>size</code> bytes of data from the underlying buffered input
    * stream into the specified byte array, starting at the offset 
<code>0</code>.
+   * The reads are performed until we reach EOF of the stream (when the return
+   * value of the underlying read method is -1) or when we read more than or
+   * equal to the <code>size</code> bytes.
    * If it fails to read the specified size, <code>IOException</code> is 
thrown.
    *
-   * @throws EOFException if the end of the stream has been reached
    * @throws IOException if this input stream has been closed, an I/O
    *                     error occurs, or fail to read the specified size
    */
-  private void doRead(byte[] bytes, int size) throws EOFException, IOException 
{
+  private void doRead(byte[] bytes, int size) throws IOException {
     Preconditions.checkNotNull(bytes);
-    int read = in.read(bytes, 0, size);
-    if (read == -1) {
-      throw new EOFException("the end of the stream has been reached");
-    } else if (read != size) {
+    int totalRead = in.read(bytes, 0, size);
+    do {
+      int read = in.read(bytes, totalRead, size - totalRead);
+      if (read == -1) {
+        break;
+      }
+      totalRead += read;
+    } while (totalRead < size);
+    if (totalRead != size) {
       throw new IOException(
-              String.format("unable to receive message, expected (%d) bytes " +
-                            "but read (%d) bytes", size, read));
+          String.format("unable to receive message, expected (%d) bytes " +
+              "but read (%d) bytes.", size, totalRead));
     }
   }
 
   /**
    * Reads <code>size</code> bytes of data from the underlying buffered input
-   * stream and discards all the bytes read.
+   * stream and discards all the bytes read. The reads are performed until we
+   * reach EOF of the stream (when the return value of the underlying read
+   * method is -1) or when we read more than or equal to the
+   * <code>size</code> bytes.
    * If it fails to read the specified size, <code>IOException</code> is 
thrown.
    *
-   * @throws EOFException if the end of the stream has been reached
    * @throws IOException if this input stream has been closed, an I/O
    *                     error occurs, or fail to read the specified size
    */
-  private void doReadAndDiscard(int size) throws EOFException, IOException {
+  private void doReadAndDiscard(int size) throws IOException {
     byte[] buf = new byte[4096];
     int rem = size;
-    while (rem > 0) {
-      int toRead = Math.min(4096, rem);
+    int toRead = Math.min(4096, rem);
+    do {
       int read = in.read(buf, 0, toRead);
       if (read == -1) {
-        throw new EOFException(String.format("the end of the stream " +
-            "has been reached while reading out oversized message (%d bytes)", 
size));
-      } else if (read != toRead) {
-        throw new IOException(
-            String.format("unable to read next chunk of oversized message (%d 
bytes), " +
-                "expected %d bytes but read %d bytes", size, toRead, read));
+        break;
       }
       rem -= read;
+      toRead = Math.min(4096, rem);
+    } while (rem > 0);
+    if (rem > 0) {
+      throw new IOException(
+          String.format("unable to read next chunk of oversized message (%d 
bytes), " +
+              "expected %d bytes but read %d bytes", size, size, size - rem));
     }
   }
 
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
index bc0878ef6..99a4dd722 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
@@ -67,12 +67,8 @@ class MessageReader implements Runnable {
       try {
         data = messageIO.readBytes();
       } catch (KuduSubprocessException e) {
-        LOG.error("%s: continuing", e.getMessage());
+        LOG.error("{}: continuing", e.getMessage());
         continue;
-      } catch (EOFException e) {
-        LOG.info("Reaching the end of the input stream, exiting.");
-        // Break the loop if the end of the stream has been reached.
-        break;
       } catch (IOException e) {
         throw new KuduSubprocessException("Unable to read the protobuf 
message", e);
       }
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 92010c0fb..7d555fd24 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -229,6 +229,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
   public void testMalformedPB() throws Exception {
     SubprocessExecutor executor = setUpExecutorIO(NO_ERR, 
/*injectIOError*/false);
     requestSenderPipe.write("malformed".getBytes(StandardCharsets.UTF_8));
+    // We need to close the pipe for the read() in InputStream.java to not 
block
+    requestSenderPipe.close();
     Throwable thrown = Assert.assertThrows(ExecutionException.class,
         () -> executor.run(new SubprocessConfiguration(NO_ARGS),
                            new EchoProtocolHandler(), TIMEOUT_MS));
diff --git a/src/kudu/subprocess/subprocess_server-test.cc 
b/src/kudu/subprocess/subprocess_server-test.cc
index b6a2a7bd2..b6e234a50 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -107,6 +107,7 @@ class SubprocessServerTest : public KuduTest {
 
   Status InitSubprocessServer(int java_queue_size,
                               int java_parser_threads,
+                              int java_max_msg_bytes,
                               shared_ptr<SubprocessServer>* out) {
     // Set up a subprocess server pointing at the kudu-subprocess.jar that
     // contains an echo handler and call EchoSubprocessMain.
@@ -130,6 +131,10 @@ class SubprocessServerTest : public KuduTest {
       argv.emplace_back("-p");
       argv.emplace_back(std::to_string(java_parser_threads));
     }
+    if (java_max_msg_bytes > 0) {
+      argv.emplace_back("-m");
+      argv.emplace_back(std::to_string(java_max_msg_bytes));
+    }
     *out = make_shared<SubprocessServer>(env_, pipe_path, std::move(argv),
                                          
EchoSubprocessMetrics(metric_entity_));
     return (*out)->Init();
@@ -137,8 +142,9 @@ class SubprocessServerTest : public KuduTest {
 
   // Resets the subprocess server to account for any new configuration.
   Status ResetSubprocessServer(int java_queue_size = 0,
-                               int java_parser_threads = 0) {
-    return InitSubprocessServer(java_queue_size, java_parser_threads, 
&server_);
+                               int java_parser_threads = 0,
+                               int java_max_msg_bytes = 0) {
+    return InitSubprocessServer(java_queue_size, java_parser_threads, 
java_max_msg_bytes, &server_);
   }
 
  protected:
@@ -318,10 +324,11 @@ TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
   } \
 } while (0);
 
+  threads.reserve(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
     threads.emplace_back([&, i] {
       shared_ptr<SubprocessServer> server;
-      EXIT_NOT_OK(InitSubprocessServer(0, 0, &server), i);
+      EXIT_NOT_OK(InitSubprocessServer(0, 0, 0, &server), i);
       const string msg = Substitute("$0 bottles of tea on the wall", i);
       SubprocessRequestPB req = CreateEchoSubprocessRequestPB(msg);
       SubprocessResponsePB resp;
@@ -421,6 +428,44 @@ TEST_F(SubprocessServerTest, UnlimitedPayloadSize) {
   }
 }
 
+// Check cases where the message is large enough to not fit in the pipe to be
+// transferred in one single pipe message transmission.
+TEST_F(SubprocessServerTest, LargePayloadSize) {
+  // Set a short timeout to speed up testing.
+  FLAGS_subprocess_timeout_secs = 5;
+  // Set the max message to 24MB (3x the default size)
+  FLAGS_subprocess_max_message_size_bytes =  24 * 1024 * 1024;
+  ASSERT_OK(ResetSubprocessServer(0, 0, 24 * 1024 * 1024));
+
+  // Send in a large message that isn't oversized as per the current limit.
+  {
+    auto req = CreateEchoSubprocessRequestPB(string(23 * 1024 * 1024, 'x'));
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+
+  // Send a large oversized message.
+  {
+    auto req = CreateEchoSubprocessRequestPB(string(24 * 1024 * 1024, 'x'));
+    SubprocessResponsePB res;
+    const auto s = server_->Execute(&req, &res);
+
+    // The request will timeout because the oversized response is read and
+    // discarded, and there isn't any application-level data to be sent back.
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "timed out while in flight");
+  }
+
+  // Non-oversized follow-up messages should be received without any issues:
+  // the communication channel should be cleared of any oversized requests
+  // sent earlier.
+  {
+    auto req = CreateEchoSubprocessRequestPB(string(23 * 1024 * 1024, 'x'));
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+}
+
 } // namespace subprocess
 } // namespace kudu
 

Reply via email to