This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 347e42d59c7 Pipe: Improved the air gap receiver socket close handling 
logic (#12285)
347e42d59c7 is described below

commit 347e42d59c7af714b914e4ba793100a9edfb62c0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 19:00:12 2024 +0800

    Pipe: Improved the air gap receiver socket close handling logic (#12285)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 47 ++++++++++++++++++----
 1 file changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index f003219c781..f2c529bc4a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteRespo
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -135,7 +136,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
             req);
         fail();
       }
-    } catch (Exception e) {
+    } catch (final PipeConnectionException e) {
+      LOGGER.info("Socket closed when listening to data. Because: {}", 
e.getMessage());
+    } catch (final Exception e) {
       LOGGER.warn("Exception during handling receiving, receiverId: {}", 
receiverId, e);
       fail();
     }
@@ -167,7 +170,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
   private byte[] readData(InputStream inputStream) throws IOException {
     final int length = readLength(inputStream);
 
-    if (length == 0) {
+    if (length <= 0) {
       // Will fail() after checkSum()
       return new byte[0];
     }
@@ -207,18 +210,48 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
         : 0;
   }
 
-  private void readTillFull(InputStream inputStream, byte[] readBuffer) throws 
IOException {
+  /**
+   * Read to the buffer until it is full.
+   *
+   * @param inputStream the input socket stream
+   * @param readBuffer the buffer to read into
+   * @throws IOException if any IOException occurs
+   * @throws PipeConnectionException if the socket is closed during listening
+   */
+  private void readTillFull(final InputStream inputStream, final byte[] 
readBuffer)
+      throws IOException, PipeConnectionException {
     int alreadyReadBytes = 0;
     while (alreadyReadBytes < readBuffer.length) {
-      alreadyReadBytes +=
+      final int readBytes =
           inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - 
alreadyReadBytes);
+      // In socket input stream readBytes == -1 indicates EOF, namely the
+      // socket is closed
+      if (readBytes == -1) {
+        throw new PipeConnectionException("Socket closed when executing 
readTillFull.");
+      }
+      alreadyReadBytes += readBytes;
     }
   }
 
-  private void skipTillEnough(InputStream inputStream, long length) throws 
IOException {
-    int currentSkippedBytes = 0;
+  /**
+   * Skip given number of bytes of the buffer until enough bytes is skipped.
+   *
+   * @param inputStream the input socket stream
+   * @param length the length to skip
+   * @throws IOException if any IOException occurs
+   * @throws PipeConnectionException if the socket is closed during skipping
+   */
+  private void skipTillEnough(final InputStream inputStream, final long length)
+      throws IOException, PipeConnectionException {
+    long currentSkippedBytes = 0;
     while (currentSkippedBytes < length) {
-      currentSkippedBytes += (int) inputStream.skip(length - 
currentSkippedBytes);
+      final long skippedBytes = inputStream.skip(length - currentSkippedBytes);
+      // In socket input stream skippedBytes == 0 indicates EOF, namely the
+      // socket is closed
+      if (skippedBytes == 0) {
+        throw new PipeConnectionException("Socket closed when executing 
skipTillEnough.");
+      }
+      currentSkippedBytes += skippedBytes;
     }
   }
 }

Reply via email to