[CARBONDATA-2749][dataload] In HDFS Empty tablestatus file is written during datalaod, iud or compaction when disk is full.
Problem: When a failure happens due to disk full during load, IUD or Compaction, then while updating the tablestatus file, the tablestaus.tmp file during atomic file operation remains empty, and in the finally block the empty tablestaus.tmp file is getting renamed to the actual file. This leads to empty tablestatus file. Once such problem happens the tablestatus file can not be retrieved and the already loaded data can be used. Solution: If the failures happens during write then the the schema rename in the finally block must be avoided. This closes #2517 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/76285717 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/76285717 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/76285717 Branch: refs/heads/external-format Commit: 76285717c0ecd963b86fcdb13c63606822bcea3b Parents: fd747a3 Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Tue Jul 17 16:59:35 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Mon Jul 30 14:51:26 2018 +0530 ---------------------------------------------------------------------- .../status/DiskBasedDataMapStatusProvider.java | 1 + .../AtomicFileOperationS3Impl.java | 5 +- .../fileoperations/AtomicFileOperations.java | 2 + .../AtomicFileOperationsImpl.java | 24 ++++++++-- .../core/metadata/SegmentFileStore.java | 4 ++ .../statusmanager/SegmentStatusManager.java | 5 ++ .../SegmentUpdateStatusManager.java | 2 + .../hadoop/testutil/StoreCreator.java | 4 ++ .../processing/util/CarbonLoaderUtil.java | 50 ++++---------------- 9 files changed, 50 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java index 22a7f6d..d42c98a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java @@ -174,6 +174,7 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi brWriter.write(metadataInstance); } catch (IOException ioe) { LOG.error("Error message: " + ioe.getLocalizedMessage()); + fileWrite.setFailed(); throw ioe; } finally { if (null != brWriter) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java index 71730f0..f5311cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java +++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java @@ -44,7 +44,6 @@ class AtomicFileOperationS3Impl implements AtomicFileOperations { AtomicFileOperationS3Impl(String filePath) { this.filePath = filePath; } - @Override public DataInputStream openForRead() throws IOException { return FileFactory.getDataInputStream(filePath, FileFactory.getFileType(filePath)); } @@ -61,4 +60,8 @@ class AtomicFileOperationS3Impl implements AtomicFileOperations { dataOutStream = FileFactory.getDataOutputStream(filePath, fileType); return dataOutStream; } + + @Override public void setFailed() { + // no implementation required + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java index ffaa1b1..a641a49 100644 --- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java +++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java @@ -28,4 +28,6 @@ public interface AtomicFileOperations { void close() throws IOException; DataOutputStream openForWrite(FileWriteOperation operation) throws IOException; + + void setFailed(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java index af2456a..f9f8647 100644 --- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java @@ -21,6 +21,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -29,6 +31,11 @@ import org.apache.carbondata.core.util.CarbonUtil; class AtomicFileOperationsImpl implements AtomicFileOperations { + /** + * Logger instance + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(AtomicFileOperationsImpl.class.getName()); private String filePath; private FileType fileType; @@ -36,6 +43,7 @@ class AtomicFileOperationsImpl implements AtomicFileOperations { private String tempWriteFilePath; private DataOutputStream dataOutStream; + private boolean setFailed; AtomicFileOperationsImpl(String filePath, FileType fileType) { this.filePath = filePath; @@ -70,12 +78,20 @@ class AtomicFileOperationsImpl implements AtomicFileOperations { if (null != dataOutStream) { CarbonUtil.closeStream(dataOutStream); CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - if (!tempFile.renameForce(filePath)) { - throw new IOException("temporary file renaming failed, src=" - + tempFile.getPath() + ", dest=" + filePath); + if (!this.setFailed) { + if (!tempFile.renameForce(filePath)) { + throw new IOException( + "temporary file renaming failed, src=" + tempFile.getPath() + ", dest=" + filePath); + } + } else { + LOGGER.warn("The temporary file renaming skipped due to I/O error, deleting file " + + tempWriteFilePath); + tempFile.delete(); } } - } + @Override public void setFailed() { + this.setFailed = true; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 32f6155..67e58d1 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -229,6 +229,10 @@ public class SegmentFileStore { String metadataInstance = gsonObjectToWrite.toJson(segmentFile); brWriter.write(metadataInstance); brWriter.flush(); + } catch (IOException ie) { + LOGGER.error("Error message: " + ie.getLocalizedMessage()); + fileWrite.setFailed(); + throw ie; } finally { CarbonUtil.closeStreams(brWriter); fileWrite.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index d5b456c..daf54a0 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -512,6 +512,7 @@ public class SegmentStatusManager { brWriter.write(metadataInstance); } catch (IOException ioe) { LOG.error("Error message: " + ioe.getLocalizedMessage()); + fileWrite.setFailed(); throw ioe; } finally { if (null != brWriter) { @@ -881,6 +882,10 @@ public class SegmentStatusManager { String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); brWriter.write(metadataInstance); + } catch (IOException ie) { + LOG.error("Error message: " + ie.getLocalizedMessage()); + writeOperation.setFailed(); + throw ie; } finally { try { if (null != brWriter) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 5d5e8b0..c3daac5 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -724,6 +724,8 @@ public class SegmentUpdateStatusManager { brWriter.write(metadataInstance); } catch (IOException ioe) { LOG.error("Error message: " + ioe.getLocalizedMessage()); + fileWrite.setFailed(); + throw ioe; } finally { if (null != brWriter) { brWriter.flush(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index 6e6a65b..c113228 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -449,6 +449,10 @@ public class StoreCreator { String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); brWriter.write(metadataInstance); + } catch (IOException ioe) { + LOG.error("Error message: " + ioe.getLocalizedMessage()); + writeOperation.setFailed(); + throw ioe; } finally { try { if (null != brWriter) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/76285717/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 272abec..19353d1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -16,14 +16,17 @@ */ package org.apache.carbondata.processing.util; -import java.io.BufferedWriter; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -40,9 +43,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.FileFactory.FileType; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -62,7 +62,6 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation; import static org.apache.carbondata.core.enums.EscapeSequences.*; -import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; public final class CarbonLoaderUtil { @@ -397,39 +396,6 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setLoadStartTime(loadStartTime); } - public static void writeLoadMetadata(AbsoluteTableIdentifier identifier, - List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { - String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); - - DataOutputStream dataOutputStream; - Gson gsonObjectToWrite = new Gson(); - BufferedWriter brWriter = null; - - AtomicFileOperations writeOperation = - AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation); - - try { - dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); - - String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); - brWriter.write(metadataInstance); - } finally { - try { - if (null != brWriter) { - brWriter.flush(); - } - } catch (Exception e) { - LOGGER.error("error in flushing "); - - } - CarbonUtil.closeStreams(brWriter); - writeOperation.close(); - } - - } - public static boolean isValidEscapeSequence(String escapeChar) { return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) || escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||