This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 347e42d59c7 Pipe: Improved the air gap receiver socket close handling
logic (#12285)
347e42d59c7 is described below
commit 347e42d59c7af714b914e4ba793100a9edfb62c0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 19:00:12 2024 +0800
Pipe: Improved the air gap receiver socket close handling logic (#12285)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../protocol/airgap/IoTDBAirGapReceiver.java | 47 ++++++++++++++++++----
1 file changed, 40 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index f003219c781..f2c529bc4a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteRespo
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -135,7 +136,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
req);
fail();
}
- } catch (Exception e) {
+ } catch (final PipeConnectionException e) {
+ LOGGER.info("Socket closed when listening to data. Because: {}",
e.getMessage());
+ } catch (final Exception e) {
LOGGER.warn("Exception during handling receiving, receiverId: {}",
receiverId, e);
fail();
}
@@ -167,7 +170,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
private byte[] readData(InputStream inputStream) throws IOException {
final int length = readLength(inputStream);
- if (length == 0) {
+ if (length <= 0) {
// Will fail() after checkSum()
return new byte[0];
}
@@ -207,18 +210,48 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
: 0;
}
- private void readTillFull(InputStream inputStream, byte[] readBuffer) throws
IOException {
+ /**
+ * Read to the buffer until it is full.
+ *
+ * @param inputStream the input socket stream
+ * @param readBuffer the buffer to read into
+ * @throws IOException if any IOException occurs
+ * @throws PipeConnectionException if the socket is closed during listening
+ */
+ private void readTillFull(final InputStream inputStream, final byte[]
readBuffer)
+ throws IOException, PipeConnectionException {
int alreadyReadBytes = 0;
while (alreadyReadBytes < readBuffer.length) {
- alreadyReadBytes +=
+ final int readBytes =
inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length -
alreadyReadBytes);
+ // In socket input stream readBytes == -1 indicates EOF, namely the
+ // socket is closed
+ if (readBytes == -1) {
+ throw new PipeConnectionException("Socket closed when executing
readTillFull.");
+ }
+ alreadyReadBytes += readBytes;
}
}
- private void skipTillEnough(InputStream inputStream, long length) throws
IOException {
- int currentSkippedBytes = 0;
+ /**
+ * Skip given number of bytes of the buffer until enough bytes is skipped.
+ *
+ * @param inputStream the input socket stream
+ * @param length the length to skip
+ * @throws IOException if any IOException occurs
+ * @throws PipeConnectionException if the socket is closed during skipping
+ */
+ private void skipTillEnough(final InputStream inputStream, final long length)
+ throws IOException, PipeConnectionException {
+ long currentSkippedBytes = 0;
while (currentSkippedBytes < length) {
- currentSkippedBytes += (int) inputStream.skip(length -
currentSkippedBytes);
+ final long skippedBytes = inputStream.skip(length - currentSkippedBytes);
+ // In socket input stream skippedBytes == 0 indicates EOF, namely the
+ // socket is closed
+ if (skippedBytes == 0) {
+ throw new PipeConnectionException("Socket closed when executing
skipTillEnough.");
+ }
+ currentSkippedBytes += skippedBytes;
}
}
}