KYLIN-1883: Consensus Problem when running the tool, MetadataCleanupJob
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07177caf Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07177caf Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07177caf Branch: refs/heads/master Commit: 07177caf7062ec3b631324924a35b6a02e1bde94 Parents: 1462371 Author: Zhong <yangzh...@lm-shc-16501214.corp.ebay.com> Authored: Wed Jul 13 18:26:07 2016 +0800 Committer: Zhong <yangzh...@lm-shc-16501214.corp.ebay.com> Committed: Wed Jul 20 14:20:10 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/steps/MetadataCleanupJob.java | 32 +++++++++----------- 1 file changed, 15 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/07177caf/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java index f300de9..f8d265b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java @@ -101,15 +101,6 @@ public class MetadataCleanupJob extends AbstractHadoopJob { public void cleanup() throws Exception { CubeManager cubeManager = CubeManager.getInstance(config); - Set<String> activeResourceList = Sets.newHashSet(); - for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) { - for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) { - activeResourceList.addAll(segment.getSnapshotPaths()); - activeResourceList.addAll(segment.getDictionaryPaths()); - activeResourceList.add(segment.getStatisticsResourcePath()); - } - } - List<String> toDeleteResource = Lists.newArrayList(); // two level resources, snapshot tables and cube statistics @@ -121,10 +112,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob { NavigableSet<String> snapshotNames = getStore().listResources(snapshotTable); if (snapshotNames != null) for (String snapshot : snapshotNames) { - if (!activeResourceList.contains(snapshot)) { - if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot))) - toDeleteResource.add(snapshot); - } + if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot))) + toDeleteResource.add(snapshot); } } } @@ -140,13 +129,22 @@ public class MetadataCleanupJob extends AbstractHadoopJob { NavigableSet<String> dictionaries = getStore().listResources(tableCol); if (dictionaries != null) for (String dict : dictionaries) - if (!activeResourceList.contains(dict)) { - if (isOlderThanThreshold(getStore().getResourceTimestamp(dict))) - toDeleteResource.add(dict); - } + if (isOlderThanThreshold(getStore().getResourceTimestamp(dict))) + toDeleteResource.add(dict); } } + Set<String> activeResourceList = Sets.newHashSet(); + for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) { + for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) { + activeResourceList.addAll(segment.getSnapshotPaths()); + activeResourceList.addAll(segment.getDictionaryPaths()); + activeResourceList.add(segment.getStatisticsResourcePath()); + } + } + + toDeleteResource.removeAll(activeResourceList); + // delete old and completed jobs ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv()); List<ExecutablePO> allExecutable = executableDao.getJobs();