This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch receiver-delete-file-failed in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 33d19977f7493e4d89f2418488f991535d7b9121 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Apr 8 13:23:57 2024 +0800 fix --- .../commons/pipe/receiver/IoTDBFileReceiver.java | 233 ++++++++++++++------- 1 file changed, 152 insertions(+), 81 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 69d4ee815a8..79fed02b1e5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -37,13 +37,13 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.file.Files; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +61,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // Used to generate transfer id, which is used to identify a receiver thread. private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0); - private final AtomicLong receiverId = new AtomicLong(0); + protected final AtomicLong receiverId = new AtomicLong(0); private File writingFile; private RandomAccessFile writingFileWriter; @@ -94,53 +94,69 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (receiverFileDirWithIdSuffix.get() != null) { if (receiverFileDirWithIdSuffix.get().exists()) { try { - Files.delete(receiverFileDirWithIdSuffix.get().toPath()); + FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get()); LOGGER.info( - "Original receiver file dir {} was deleted.", + "Receiver id = {}: Original receiver file dir {} was deleted.", + receiverId.get(), receiverFileDirWithIdSuffix.get().getPath()); } catch (IOException e) { LOGGER.warn( - "Failed to delete original receiver file dir {}, because {}.", + "Receiver id = {}: Failed to delete original receiver file dir {}, because {}.", + receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), - e.getMessage()); + e.getMessage(), + e); } } else { - LOGGER.info( - "Original receiver file dir {} is not existed. No need to delete.", - receiverFileDirWithIdSuffix.get().getPath()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver id = {}: Original receiver file dir {} is not existed. No need to delete.", + receiverId.get(), + receiverFileDirWithIdSuffix.get().getPath()); + } } receiverFileDirWithIdSuffix.set(null); } else { - LOGGER.info("Current receiver file dir is null. No need to delete."); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver id = {}: Current receiver file dir is null. No need to delete.", + receiverId.get()); + } } final String receiverFileBaseDir; try { receiverFileBaseDir = getReceiverFileBaseDir(); if (Objects.isNull(receiverFileBaseDir)) { - LOGGER.error( - "Failed to init pipe receiver file folder manager because all disks of folders are full."); + LOGGER.warn( + "Receiver id = {}: Failed to init pipe receiver file folder manager because all disks of folders are full.", + receiverId.get()); return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT)); } } catch (Exception e) { - LOGGER.error( - "Fail to create pipe receiver file folder because all disks of folders are full.", e); + LOGGER.warn( + "Receiver id = {}: Failed to create pipe receiver file folder because all disks of folders are full.", + receiverId.get(), + e); return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT)); } // Create a new receiver file dir final File newReceiverDir = new File(receiverFileBaseDir, Long.toString(receiverId.get())); - if (!newReceiverDir.exists()) { - if (newReceiverDir.mkdirs()) { - LOGGER.info("Receiver file dir {} was created.", newReceiverDir.getPath()); - } else { - LOGGER.error("Failed to create receiver file dir {}.", newReceiverDir.getPath()); - } + if (!newReceiverDir.exists() && !newReceiverDir.mkdirs()) { + LOGGER.warn( + "Receiver id = {}: Failed to create receiver file dir {}.", + receiverId.get(), + newReceiverDir.getPath()); + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, + String.format("Failed to create receiver file dir %s.", newReceiverDir.getPath()))); } receiverFileDirWithIdSuffix.set(newReceiverDir); LOGGER.info( - "Handshake successfully, receiver id = {}, receiver file dir = {}.", + "Receiver id = {}: Handshake successfully, receiver file dir = {}.", receiverId.get(), newReceiverDir.getPath()); return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS); @@ -157,7 +173,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_HANDSHAKE_ERROR, "Receiver can not get clusterId from config node."); - LOGGER.warn("Handshake failed, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status); return new TPipeTransferResp(status); } @@ -168,7 +185,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final TSStatus status = RpcUtils.getStatus( TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not contain clusterId."); - LOGGER.warn("Handshake failed, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status); return new TPipeTransferResp(status); } @@ -180,7 +198,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { String.format( "Receiver and sender are from the same cluster %s.", clusterIdFromHandshakeRequest)); - LOGGER.warn("Handshake failed, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status); return new TPipeTransferResp(status); } @@ -192,7 +211,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not contain timestampPrecision."); - LOGGER.warn("Handshake failed, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status); return new TPipeTransferResp(status); } @@ -237,7 +257,10 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { String.format( "Request sender to reset file reader's offset from %s to %s.", req.getStartWritingOffset(), writingFileWriter.length())); - LOGGER.warn("File offset reset requested by receiver, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: File offset reset requested by receiver, response status = {}.", + receiverId.get(), + status); return PipeTransferFilePieceResp.toTPipeTransferResp(status, writingFileWriter.length()); } @@ -246,7 +269,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { return PipeTransferFilePieceResp.toTPipeTransferResp( RpcUtils.SUCCESS_STATUS, writingFileWriter.length()); } catch (Exception e) { - LOGGER.warn("Failed to write file piece from req {}.", req, e); + LOGGER.warn( + "Receiver id = {}: Failed to write file piece from req {}.", receiverId.get(), req, e); final TSStatus status = RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, @@ -267,8 +291,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } LOGGER.info( - "Writing file {} is not existed or name is not correct, try to create it. " + "Receiver id = {}: Writing file {} is not existed or name is not correct, try to create it. " + "Current writing file is {}.", + receiverId.get(), fileName, writingFile == null ? "null" : writingFile.getPath()); @@ -284,16 +309,23 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (!receiverFileDirWithIdSuffix.get().exists()) { if (receiverFileDirWithIdSuffix.get().mkdirs()) { LOGGER.info( - "Receiver file dir {} was created.", receiverFileDirWithIdSuffix.get().getPath()); + "Receiver id = {}: Receiver file dir {} was created.", + receiverId.get(), + receiverFileDirWithIdSuffix.get().getPath()); } else { LOGGER.error( - "Failed to create receiver file dir {}.", receiverFileDirWithIdSuffix.get().getPath()); + "Receiver id = {}: Failed to create receiver file dir {}.", + receiverId.get(), + receiverFileDirWithIdSuffix.get().getPath()); } } writingFile = new File(receiverFileDirWithIdSuffix.get(), fileName); writingFileWriter = new RandomAccessFile(writingFile, "rw"); - LOGGER.info("Writing file {} was created. Ready to write file pieces.", writingFile.getPath()); + LOGGER.info( + "Receiver id = {}: Writing file {} was created. Ready to write file pieces.", + receiverId.get(), + writingFile.getPath()); } private boolean isFileExistedAndNameCorrect(String fileName) { @@ -305,17 +337,24 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { try { writingFileWriter.close(); LOGGER.info( - "Current writing file writer {} was closed.", + "Receiver id = {}: Current writing file writer {} was closed.", + receiverId.get(), writingFile == null ? "null" : writingFile.getPath()); } catch (IOException e) { LOGGER.warn( - "Failed to close current writing file writer {}, because {}.", + "Receiver id = {}: Failed to close current writing file writer {}, because {}.", + receiverId.get(), writingFile == null ? "null" : writingFile.getPath(), - e.getMessage()); + e.getMessage(), + e); } writingFileWriter = null; } else { - LOGGER.info("Current writing file writer is null. No need to close."); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver id = {}: Current writing file writer is null. No need to close.", + receiverId.get()); + } } } @@ -323,23 +362,36 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (writingFile != null) { deleteFile(writingFile); } else { - LOGGER.info("Current writing file is null. No need to delete."); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver id = {}: Current writing file is null. No need to delete.", receiverId.get()); + } } } private void deleteFile(File file) { if (file.exists()) { try { - Files.delete(file.toPath()); - LOGGER.info("Original writing file {} was deleted.", file.getPath()); + FileUtils.delete(file); + LOGGER.info( + "Receiver id = {}: Original writing file {} was deleted.", + receiverId.get(), + file.getPath()); } catch (IOException e) { LOGGER.warn( - "Failed to delete original writing file {}, because {}.", + "Receiver id = {}: Failed to delete original writing file {}, because {}.", + receiverId.get(), file.getPath(), - e.getMessage()); + e.getMessage(), + e); } } else { - LOGGER.info("Original file {} is not existed. No need to delete.", file.getPath()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver id = {}: Original file {} is not existed. No need to delete.", + receiverId.get(), + file.getPath()); + } } } @@ -347,7 +399,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final boolean offsetCorrect = writingFileWriter.length() == offset; if (!offsetCorrect) { LOGGER.warn( - "Writing file {}'s offset is {}, but request sender's offset is {}.", + "Receiver id = {}: Writing file {}'s offset is {}, but request sender's offset is {}.", + receiverId.get(), writingFile.getPath(), writingFileWriter.length(), offset); @@ -392,20 +445,21 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final TSStatus status = loadFileV1(req, fileAbsolutePath); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Seal file {} successfully. Receiver id is {}.", fileAbsolutePath, receiverId.get()); + "Receiver id = {}: Seal file {} successfully.", receiverId.get(), fileAbsolutePath); } else { LOGGER.warn( - "Failed to seal file {}, because {}. Receiver id is {}.", + "Receiver id = {}: Failed to seal file {}, because {}.", + receiverId.get(), fileAbsolutePath, - status.getMessage(), - receiverId.get()); + status.getMessage()); } return new TPipeTransferResp(status); } catch (IOException e) { LOGGER.warn( - String.format( - "Failed to seal file %s from req %s. Receiver id is %d.", - writingFile, req, receiverId.get()), + "Receiver id = {}: Failed to seal file {} from req {}.", + receiverId.get(), + writingFile, + req, e); return new TPipeTransferResp( RpcUtils.getStatus( @@ -413,7 +467,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { String.format("Failed to seal file %s because %s", writingFile, e.getMessage()))); } finally { // If the writing file is not sealed successfully, the writing file will be deleted. - // All pieces of the writing file and its mod(if exists) should be retransmitted by the + // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. closeCurrentWritingFileWriter(); deleteCurrentWritingFile(); @@ -471,21 +525,18 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final TSStatus status = loadFileV2(req, fileAbsolutePaths); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Seal file {} successfully. Receiver id is {}.", fileAbsolutePaths, receiverId.get()); + "Receiver id = {}: Seal file {} successfully.", receiverId.get(), fileAbsolutePaths); } else { LOGGER.warn( - "Failed to seal file {}, status is {}. Receiver id is {}.", + "Receiver id = {}: Failed to seal file {}, status is {}.", + receiverId.get(), fileAbsolutePaths, - status, - receiverId.get()); + status); } return new TPipeTransferResp(status); } catch (IOException | IllegalPathException e) { LOGGER.warn( - String.format( - "Failed to seal file %s from req %s. Receiver id is %d.", - writingFile, req, receiverId.get()), - e); + "Receiver id = {}: Failed to seal file {} from req {}.", receiverId.get(), files, req, e); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, @@ -508,7 +559,10 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, the file does not exist.", fileName)); - LOGGER.warn(status.getMessage()); + LOGGER.warn( + "Receiver id = {}: Failed to seal file {}, because the file does not exist.", + receiverId.get(), + fileName); return new TPipeTransferResp(status); } @@ -520,9 +574,16 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { "Failed to seal file %s, because the length of file is not correct. " + "The original file has length %s, but receiver file has length %s.", fileName, fileLength, writingFileWriter.length())); - LOGGER.warn(status.getMessage()); + LOGGER.warn( + "Receiver id = {}: Failed to seal file {}, because the length of file is not correct. " + + "The original file has length {}, but receiver file has length {}.", + receiverId.get(), + fileName, + fileLength, + writingFileWriter.length()); return new TPipeTransferResp(status); } + return null; } @@ -533,8 +594,12 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format( - "Failed to seal file %s, but writing file is %s.", fileName, writingFile)); - LOGGER.warn(status.getMessage()); + "Failed to seal file %s, because writing file is %s.", fileName, writingFile)); + LOGGER.warn( + "Receiver id = {}: Failed to seal file {}, because writing file is {}.", + receiverId.get(), + fileName, + writingFile); return new TPipeTransferResp(status); } @@ -546,9 +611,16 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { "Failed to seal file %s, because the length of file is not correct. " + "The original file has length %s, but receiver file has length %s.", fileName, fileLength, writingFileWriter.length())); - LOGGER.warn(status.getMessage()); + LOGGER.warn( + "Receiver id = {}: Failed to seal file {}, because the length of file is not correct. " + + "The original file has length {}, but receiver file has length {}.", + receiverId.get(), + fileName, + fileLength, + writingFileWriter.length()); return new TPipeTransferResp(status); } + return null; } @@ -557,7 +629,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { writingFile != null && writingFile.exists() && writingFileWriter != null; if (!isWritingFileAvailable) { LOGGER.info( - "Writing file {} is not available. Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.", + "Receiver id = {}: Writing file {} is not available. " + + "Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.", + receiverId.get(), writingFile, writingFile == null, writingFile != null && writingFile.exists(), @@ -578,14 +652,11 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (writingFileWriter != null) { try { writingFileWriter.close(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Handling exit (receiver id = {}): writing file writer was closed.", - receiverId.get()); - } + LOGGER.info( + "Receiver id = {}: Handling exit: Writing file writer was closed.", receiverId.get()); } catch (Exception e) { LOGGER.warn( - "Handling exit (receiver id = {}): close writing file writer error.", + "Receiver id = {}: Handling exit: Close writing file writer error.", receiverId.get(), e); } @@ -593,21 +664,21 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Handling exit (receiver id = {}): writing file writer is null. No need to close.", + "Receiver id = {}: Handling exit: Writing file writer is null. No need to close.", receiverId.get()); } } if (writingFile != null) { try { - Files.delete(writingFile.toPath()); + FileUtils.delete(writingFile); LOGGER.info( - "Handling exit (receiver id = {}): writing file {} was deleted.", + "Receiver id = {}: Handling exit: Writing file {} was deleted.", receiverId.get(), writingFile.getPath()); } catch (Exception e) { LOGGER.warn( - "Handling exit (receiver id = {}): delete file {} error.", + "Receiver id = {}: Handling exit: Delete writing file {} error.", receiverId.get(), writingFile.getPath(), e); @@ -616,7 +687,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Handling exit (receiver id = {}): writing file is null. No need to delete.", + "Receiver id = {}: Handling exit: Writing file is null. No need to delete.", receiverId.get()); } } @@ -625,14 +696,14 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (receiverFileDirWithIdSuffix.get() != null) { if (receiverFileDirWithIdSuffix.get().exists()) { try { - Files.delete(receiverFileDirWithIdSuffix.get().toPath()); + FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get()); LOGGER.info( - "Handling exit (receiver id = {}): original receiver file dir {} was deleted.", + "Receiver id = {}: Handling exit: Original receiver file dir {} was deleted.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath()); } catch (IOException e) { LOGGER.warn( - "Handling exit (receiver id = {}): delete original receiver file dir {} error.", + "Receiver id = {}: Handling exit: Delete original receiver file dir {} error.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), e); @@ -640,7 +711,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Handling exit (receiver id = {}): original receiver file dir {} does not exist. No need to delete.", + "Receiver id = {}: Handling exit: Original receiver file dir {} does not exist. No need to delete.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath()); } @@ -649,11 +720,11 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Handling exit (receiver id = {}): original receiver file dir is null. No need to delete.", + "Receiver id = {}: Handling exit: Original receiver file dir is null. No need to delete.", receiverId.get()); } } - LOGGER.info("Handling exit (receiver id = {}): receiver exited.", receiverId.get()); + LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.", receiverId.get()); } }
