shaofengshi closed pull request #422: KYLIN-3430 Global Dictionary Cleanup URL: https://github.com/apache/kylin/pull/422
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java index 5ee5c7a370..e11fe74c87 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.job; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -25,11 +26,18 @@ import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryInfoSerializer; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.execution.ExecutableState; @@ -68,6 +76,7 @@ public MetadataCleanupJob(KylinConfig config) { CubeManager cubeManager = CubeManager.getInstance(config); ResourceStore store = ResourceStore.getStore(config); long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS; + FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); List<String> toDeleteCandidates = Lists.newArrayList(); @@ -82,6 +91,23 @@ public MetadataCleanupJob(KylinConfig config) { } } + // find all of the global dictionaries in HDFS + try { + FileStatus[] fStatus = new FileStatus[0]; + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict"))); + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict"))); + for (FileStatus status : fStatus) { + String path = status.getPath().toString(); + FileStatus[] globalDicts = fs.listStatus(new Path(path)); + for (FileStatus globalDict : globalDicts) { + String globalDictPath = globalDict.getPath().toString(); + toDeleteCandidates.add(globalDictPath); + } + } + } catch (FileNotFoundException e) { + logger.info("Working Directory does not exist on HDFS. "); + } + // three level resources, only dictionaries for (String resourceRoot : new String[] { ResourceStore.DICT_RESOURCE_ROOT }) { for (String dir : noNull(store.listResources(resourceRoot))) { @@ -102,6 +128,20 @@ public MetadataCleanupJob(KylinConfig config) { activeResources.addAll(segment.getSnapshotPaths()); activeResources.addAll(segment.getDictionaryPaths()); activeResources.add(segment.getStatisticsResourcePath()); + for (String dictPath : segment.getDictionaryPaths()) { + DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER); + if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){ + String dictObj = dictInfo.getDictionaryObject().toString(); + String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1); + if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) { + activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + + "resources/GlobalDict" + dictInfo.getResourceDir()); + } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) { + activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + + "resources/SegmentDict" + dictInfo.getResourceDir()); + } + } + } } } toDeleteCandidates.removeAll(activeResources); @@ -129,7 +169,7 @@ public MetadataCleanupJob(KylinConfig config) { return garbageResources; } - private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) { + private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) throws IOException { if (toDeleteResources.isEmpty()) { logger.info("No metadata resource to clean up"); return toDeleteResources; @@ -139,10 +179,15 @@ public MetadataCleanupJob(KylinConfig config) { if (delete) { ResourceStore store = ResourceStore.getStore(config); + FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); for (String res : toDeleteResources) { logger.info("Deleting metadata " + res); try { - store.deleteResource(res); + if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) { + fs.delete(new Path(res), true); + } else { + store.deleteResource(res); + } } catch (IOException e) { logger.error("Failed to delete resource " + res, e); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services