[CARBONDATA-2021]fix clean up issue when update operation is abprutly stopped
when delete is success and update is failed while writing status file then a stale carbon data file is created. so removing that file on clean up . and also not considering that one during query. This closes #1793 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b2139cab Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b2139cab Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b2139cab Branch: refs/heads/fgdatamap Commit: b2139cabe8cdeb7c241e30a525d754578cfa5ec6 Parents: d90280a Author: akashrn5 <akashnilu...@gmail.com> Authored: Wed Jan 10 20:29:43 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Jan 31 19:23:55 2018 +0800 ---------------------------------------------------------------------- .../core/mutate/CarbonUpdateUtil.java | 64 ++++++++++++++++++-- .../SegmentUpdateStatusManager.java | 27 +++++++-- .../apache/carbondata/core/util/CarbonUtil.java | 10 +++ .../processing/util/CarbonLoaderUtil.java | 6 +- 4 files changed, 93 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index f4566ac..0e4eec7 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -427,6 +427,10 @@ public class CarbonUpdateUtil { String validUpdateStatusFile = ""; + boolean isAbortedFile = true; + + boolean isInvalidFile = false; + // scan through each segment. for (LoadMetadataDetails segment : details) { @@ -450,10 +454,14 @@ public class CarbonUpdateUtil { SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); + // deleting of the aborted file scenario. + deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager); + // get Invalid update delta files. CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager - .getUpdateDeltaFilesList(segment.getLoadName(), false, - CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles); + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles, + isInvalidFile); // now for each invalid delta file need to check the query execution time out // and then delete. @@ -465,8 +473,9 @@ public class CarbonUpdateUtil { // do the same for the index files. CarbonFile[] invalidIndexFiles = updateStatusManager - .getUpdateDeltaFilesList(segment.getLoadName(), false, - CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles); + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles, + isInvalidFile); // now for each invalid index file need to check the query execution time out // and then delete. @@ -492,11 +501,20 @@ public class CarbonUpdateUtil { continue; } + // aborted scenario. + invalidDeleteDeltaFiles = updateStatusManager + .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false, + allSegmentFiles, isAbortedFile); + for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { + boolean doForceDelete = true; + compareTimestampsAndDelete(invalidFile, doForceDelete, false); + } + // case 1 if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { completeListOfDeleteDeltaFiles = updateStatusManager .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true, - allSegmentFiles); + allSegmentFiles, isInvalidFile); for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) { compareTimestampsAndDelete(invalidFile, forceDelete, false); @@ -518,7 +536,7 @@ public class CarbonUpdateUtil { } else { invalidDeleteDeltaFiles = updateStatusManager .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false, - allSegmentFiles); + allSegmentFiles, isInvalidFile); for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { compareTimestampsAndDelete(invalidFile, forceDelete, false); @@ -559,6 +577,40 @@ public class CarbonUpdateUtil { } /** + * This function deletes all the stale carbondata files during clean up before update operation + * one scenario is if update operation is ubruptly stopped before updation of table status then + * the carbondata file created during update operation is stale file and it will be deleted in + * this function in next update operation + * @param segment + * @param allSegmentFiles + * @param updateStatusManager + */ + private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment, + CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) { + boolean doForceDelete = true; + boolean isAbortedFile = true; + CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles, + isAbortedFile); + // now for each invalid delta file need to check the query execution time out + // and then delete. + for (CarbonFile invalidFile : invalidUpdateDeltaFiles) { + compareTimestampsAndDelete(invalidFile, doForceDelete, false); + } + // do the same for the index files. + CarbonFile[] invalidIndexFiles = updateStatusManager + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles, + isAbortedFile); + // now for each invalid index file need to check the query execution time out + // and then delete. + for (CarbonFile invalidFile : invalidIndexFiles) { + compareTimestampsAndDelete(invalidFile, doForceDelete, false); + } + } + + /** * This will tell whether the max query timeout has been expired or not. * @param fileTimestamp * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index df7eedd..e0e7b70 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -469,7 +469,7 @@ public class SegmentUpdateStatusManager { */ public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact, - CarbonFile[] allFilesOfSegment) { + CarbonFile[] allFilesOfSegment, boolean isAbortedFile) { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), @@ -528,7 +528,12 @@ public class SegmentUpdateStatusManager { } } else { // invalid cases. - if (Long.compare(timestamp, startTimeStampFinal) < 0) { + if (isAbortedFile) { + if (Long.compare(timestamp, endTimeStampFinal) > 0) { + listOfCarbonFiles.add(eachFile); + } + } else if (Long.compare(timestamp, startTimeStampFinal) < 0 + || Long.compare(timestamp, endTimeStampFinal) > 0) { listOfCarbonFiles.add(eachFile); } } @@ -934,11 +939,14 @@ public class SegmentUpdateStatusManager { */ public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId, final SegmentUpdateDetails block, final boolean needCompleteList, - CarbonFile[] allSegmentFiles) { + CarbonFile[] allSegmentFiles, boolean isAbortedFile) { final long deltaStartTimestamp = getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); + final long deltaEndTimestamp = + getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); + List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); @@ -956,9 +964,16 @@ public class SegmentUpdateStatusManager { long timestamp = CarbonUpdateUtil.getTimeStampAsLong( CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName)); - if (block.getBlockName().equalsIgnoreCase(blkName) && ( - Long.compare(timestamp, deltaStartTimestamp) < 0)) { - files.add(eachFile); + if (block.getBlockName().equalsIgnoreCase(blkName)) { + + if (isAbortedFile) { + if (Long.compare(timestamp, deltaEndTimestamp) > 0) { + files.add(eachFile); + } + } else if (Long.compare(timestamp, deltaStartTimestamp) < 0 + || Long.compare(timestamp, deltaEndTimestamp) > 0) { + files.add(eachFile); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 5d7a09f..600b1c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1701,6 +1701,16 @@ public final class CarbonUtil { && blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) { return true; } + // aborted files case. + if (invalidBlockVOForSegmentId.getLatestUpdateTimestamp() != null + && blockTimeStamp > invalidBlockVOForSegmentId.getLatestUpdateTimestamp()) { + return true; + } + // for 1st time starttime stamp will be empty so need to consider fact time stamp. + if (null == invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() + && blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp()) { + return true; + } } return false; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index fdc2cc3..12fc5c1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -375,8 +375,10 @@ public final class CarbonLoaderUtil { } // reading the start time of data load. - long loadStartTime = CarbonUpdateUtil.readCurrentTime(); - model.setFactTimeStamp(loadStartTime); + if (model.getFactTimeStamp() == 0) { + long loadStartTime = CarbonUpdateUtil.readCurrentTime(); + model.setFactTimeStamp(loadStartTime); + } CarbonLoaderUtil .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false); boolean entryAdded =