KYLIN-3085 Makes sure no update on cached and shared CubeInstance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/03e6b8c5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/03e6b8c5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/03e6b8c5 Branch: refs/heads/master Commit: 03e6b8c5d622125aca36128c6a7942f711951768 Parents: c094265 Author: Li Yang <liy...@apache.org> Authored: Sun Dec 10 10:44:12 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Thu Dec 14 16:53:10 2017 +0800 ---------------------------------------------------------------------- .../persistence/RootPersistentEntity.java | 11 + .../org/apache/kylin/cube/CubeInstance.java | 6 + .../java/org/apache/kylin/cube/CubeManager.java | 203 +++++++++++++------ .../java/org/apache/kylin/cube/CubeUpdate.java | 5 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 2 + .../apache/kylin/cube/CubeManagerCacheTest.java | 22 +- .../kylin/cube/CubeManagerConcurrencyTest.java | 141 +++++++++++++ .../org/apache/kylin/cube/CubeManagerTest.java | 123 +++++------ .../org/apache/kylin/cube/CubeSegmentsTest.java | 38 ++-- .../kylin/cube/project/ProjectManagerTest.java | 8 +- .../metadata/cachesync/CachedCrudAssist.java | 67 ++++-- .../kylin/engine/mr/common/CuboidShardUtil.java | 15 +- .../mr/common/StatisticsDecisionUtil.java | 6 +- .../engine/mr/steps/MergeDictionaryStep.java | 16 +- .../engine/mr/steps/MergeCuboidMapperTest.java | 183 ----------------- .../apache/kylin/engine/spark/SparkCubing.java | 31 +-- .../kylin/provision/BuildCubeWithEngine.java | 6 +- .../kylin/provision/BuildCubeWithStream.java | 6 +- .../apache/kylin/rest/service/CubeService.java | 39 +--- .../apache/kylin/rest/service/JobService.java | 13 +- .../kylin/source/kafka/job/MergeOffsetStep.java | 22 +- 21 files changed, 528 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java index aa35482..39c2995 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java @@ -58,6 +58,9 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable { @JsonProperty("last_modified") protected long lastModified; + + // if cached and shared, the object MUST NOT be modified (call setXXX() for example) + protected boolean isCachedAndShared = false; /** * Metadata model version @@ -99,6 +102,14 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable { setUuid(UUID.randomUUID().toString()); } + public boolean isCachedAndShared() { + return isCachedAndShared; + } + + public void setCachedAndShared(boolean isCachedAndShared) { + this.isCachedAndShared = isCachedAndShared; + } + /** * The name as a part of the resource path used to save the entity. * http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 1be7923..55e9325 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -128,6 +128,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, public CubeInstance() { } + public CubeInstance latestCopyForWrite() { + CubeManager mgr = CubeManager.getInstance(config); + CubeInstance latest = mgr.getCube(name); // in case this object is out-of-date + return mgr.copyForWrite(latest); + } + void init(KylinConfig config) { CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(descName); checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", descName, name); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 3220a0f..1813ad2 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -21,6 +21,7 @@ package org.apache.kylin.cube; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -133,6 +134,7 @@ public class CubeManager implements IRealizationProvider { return cube; } }; + this.crud.setCheckCopyOnWrite(true); // touch lower level metadata before registering my listener crud.reloadAll(); @@ -243,6 +245,7 @@ public class CubeManager implements IRealizationProvider { } } + // try minimize the use of this method, use udpateCubeXXX() instead public CubeInstance updateCube(CubeUpdate update) throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { CubeInstance cube = updateCubeWithRetry(update, 0); @@ -250,6 +253,42 @@ public class CubeManager implements IRealizationProvider { } } + public CubeInstance updateCubeStatus(CubeInstance cube, RealizationStatusEnum newStatus) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + cube = cube.latestCopyForWrite(); // get a latest copy + CubeUpdate update = new CubeUpdate(cube); + update.setStatus(newStatus); + return updateCube(update); + } + } + + public CubeInstance updateCubeDropSegments(CubeInstance cube, Collection<CubeSegment> segsToDrop) + throws IOException { + CubeSegment[] arr = (CubeSegment[]) segsToDrop.toArray(new CubeSegment[segsToDrop.size()]); + return updateCubeDropSegments(cube, arr); + } + + public CubeInstance updateCubeDropSegments(CubeInstance cube, CubeSegment... segsToDrop) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + cube = cube.latestCopyForWrite(); // get a latest copy + CubeUpdate update = new CubeUpdate(cube); + update.setToRemoveSegs(segsToDrop); + return updateCube(update); + } + } + + public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum status) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = seg.getCubeInstance().latestCopyForWrite(); + seg = cube.getSegmentById(seg.getUuid()); + + CubeUpdate update = new CubeUpdate(cube); + seg.setStatus(status); + update.setToUpdateSegs(seg); + return updateCube(update); + } + } + private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException { if (update == null || update.getCubeInstance() == null) throw new IllegalStateException(); @@ -319,9 +358,8 @@ public class CubeManager implements IRealizationProvider { } cube = crud.reload(cube.getName()); - update.setCubeInstance(cube); - retry++; - cube = updateCubeWithRetry(update, retry); + update.setCubeInstance(cube.latestCopyForWrite()); + return updateCubeWithRetry(update, ++retry); } if (toRemoveResources.size() > 0) { @@ -337,10 +375,18 @@ public class CubeManager implements IRealizationProvider { //this is a duplicate call to take care of scenarios where REST cache service unavailable ProjectManager.getInstance(cube.getConfig()).clearL2Cache(); - return cube; + return crud.reload(cube.resourceName()); } - public CubeInstance reloadCubeQuietly(String cubeName) { + // for test + CubeInstance reloadCube(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return crud.reload(cubeName); + } + } + + // for internal + CubeInstance reloadCubeQuietly(String cubeName) { try (AutoLock lock = cubeMapLock.lockForWrite()) { CubeInstance cube = crud.reloadQuietly(cubeName); if (cube != null) @@ -401,6 +447,10 @@ public class CubeManager implements IRealizationProvider { return tableName; } + public CubeInstance copyForWrite(CubeInstance cube) { + return crud.copyForWrite(cube); + } + private boolean isReady(CubeSegment seg) { return seg.getStatus() == SegmentStatusEnum.READY; } @@ -490,13 +540,15 @@ public class CubeManager implements IRealizationProvider { CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException { + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); + checkBuildingSegment(cubeCopy); // fix start/end a bit - if (cube.getModel().getPartitionDesc().isPartitioned()) { + if (cubeCopy.getModel().getPartitionDesc().isPartitioned()) { // if missing start, set it to where last time ends - CubeSegment last = cube.getLastSegment(); + CubeSegment last = cubeCopy.getLastSegment(); if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) { tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); } @@ -506,38 +558,40 @@ public class CubeManager implements IRealizationProvider { segRange = null; } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); - validateNewSegments(cube, newSegment); + validateNewSegments(cubeCopy, newSegment); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); return newSegment; } public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); + checkBuildingSegment(cubeCopy); - if (cube.getModel().getPartitionDesc().isPartitioned() == false) { + if (cubeCopy.getModel().getPartitionDesc().isPartitioned() == false) { // full build tsRange = null; segRange = null; } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); - Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment); + Pair<Boolean, Boolean> pair = cubeCopy.getSegments().fitInSegments(newSegment); if (pair.getFirst() == false || pair.getSecond() == false) throw new IllegalArgumentException("The new refreshing segment " + newSegment - + " does not match any existing segment in cube " + cube); + + " does not match any existing segment in cube " + cubeCopy); if (segRange != null) { CubeSegment toRefreshSeg = null; - for (CubeSegment cubeSegment : cube.getSegments()) { + for (CubeSegment cubeSegment : cubeCopy.getSegments()) { if (cubeSegment.getSegRange().equals(segRange)) { toRefreshSeg = cubeSegment; break; @@ -553,30 +607,32 @@ public class CubeManager implements IRealizationProvider { newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd()); } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); return newSegment; } public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) throws IOException { - if (cube.getSegments().isEmpty()) - throw new IllegalArgumentException("Cube " + cube + " has no segments"); + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + + if (cubeCopy.getSegments().isEmpty()) + throw new IllegalArgumentException("Cube " + cubeCopy + " has no segments"); checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - checkCubeIsPartitioned(cube); + checkBuildingSegment(cubeCopy); + checkCubeIsPartitioned(cubeCopy); - if (cube.getSegments().getFirstSegment().isOffsetCube()) { + if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) { // offset cube, merge by date range? if (segRange == null && tsRange != null) { - Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY) + Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY) .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); if (pair == null) throw new IllegalArgumentException( - "Find no segments to merge by " + tsRange + " for cube " + cube); + "Find no segments to merge by " + tsRange + " for cube " + cubeCopy); segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end); } @@ -587,9 +643,9 @@ public class CubeManager implements IRealizationProvider { Preconditions.checkArgument(tsRange != null); } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); - Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); + Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment); if (mergingSegments.size() <= 1) throw new IllegalArgumentException("Range " + newSegment.getSegRange() + " must contain at least 2 segments, but there is " + mergingSegments.size()); @@ -628,11 +684,11 @@ public class CubeManager implements IRealizationProvider { } } - validateNewSegments(cube, newSegment); + validateNewSegments(cubeCopy, newSegment); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); return newSegment; } @@ -684,39 +740,43 @@ public class CubeManager implements IRealizationProvider { } public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { - if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) + // work on copy instead of cached objects + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid()); + + if (StringUtils.isBlank(newSegCopy.getStorageLocationIdentifier())) throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier"); + "For cube " + cubeCopy + ", segment " + newSegCopy + " missing StorageLocationIdentifier"); - if (StringUtils.isBlank(newSegment.getLastBuildJobID())) + if (StringUtils.isBlank(newSegCopy.getLastBuildJobID())) throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID"); + "For cube " + cubeCopy + ", segment " + newSegCopy + " missing LastBuildJobID"); - if (isReady(newSegment) == true) { - logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); + if (isReady(newSegCopy) == true) { + logger.warn("For cube " + cubeCopy + ", segment " + newSegCopy + " state should be NEW but is READY"); } - List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment); + List<CubeSegment> tobe = cubeCopy.calculateToBeSegments(newSegCopy); - if (tobe.contains(newSegment) == false) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); + if (tobe.contains(newSegCopy) == false) + throw new IllegalStateException("For cube " + cubeCopy + ", segment " + newSegCopy + + " is expected but not in the tobe " + tobe); - newSegment.setStatus(SegmentStatusEnum.READY); + newSegCopy.setStatus(SegmentStatusEnum.READY); List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - for (CubeSegment segment : cube.getSegments()) { + for (CubeSegment segment : cubeCopy.getSegments()) { if (!tobe.contains(segment)) toRemoveSegs.add(segment); } - logger.info( - "Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); + logger.info("Promoting cube " + cubeCopy + ", new segment " + newSegCopy + ", to remove segments " + + toRemoveSegs); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) - .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); - updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) + .setToUpdateSegs(newSegCopy).setStatus(RealizationStatusEnum.READY); + updateCube(update); } public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { @@ -833,15 +893,20 @@ public class CubeManager implements IRealizationProvider { private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) throws IOException { - if (dictInfo != null) { - Dictionary<?> dict = dictInfo.getDictionaryObject(); - cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); - cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); - - CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance()); - update.setToUpdateSegs(cubeSeg); - updateCube(update); - } + if (dictInfo == null) + return; + + // work on copy instead of cached objects + CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy + CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); + + Dictionary<?> dict = dictInfo.getDictionaryObject(); + segCopy.putDictResPath(col, dictInfo.getResourcePath()); + segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); + + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + updateCube(update); } /** @@ -868,17 +933,21 @@ public class CubeManager implements IRealizationProvider { } public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + // work on copy instead of cached objects + CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy + CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); + TableMetadataManager metaMgr = getTableManager(); SnapshotManager snapshotMgr = getSnapshotManager(); - TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject())); + TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject())); IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); - cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); - CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance()); - cubeBuilder.setToUpdateSegs(cubeSeg); - updateCube(cubeBuilder); + segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + updateCube(update); return snapshot; } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index fae20dc..1d4e722 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@ -36,7 +36,7 @@ public class CubeUpdate { private Map<Long, Long> cuboids = null; public CubeUpdate(CubeInstance cubeInstance) { - this.cubeInstance = cubeInstance; + setCubeInstance(cubeInstance); } public CubeInstance getCubeInstance() { @@ -44,6 +44,9 @@ public class CubeUpdate { } public CubeUpdate setCubeInstance(CubeInstance cubeInstance) { + if (cubeInstance.isCachedAndShared()) + throw new IllegalArgumentException(); + this.cubeInstance = cubeInstance; return this; } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 65e8965..36c06b7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -87,6 +87,8 @@ public class DictionaryGeneratorCLI { cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity); } + CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName()); + cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid()); for (TableRef lookup : toCheckLookup) { logger.info("Checking snapshot of " + lookup); JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java index 3881943..f913e1b 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.LocalFileMetadataTestCase; @@ -60,11 +61,24 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase { CubeInstance createdCube = cubeManager.getCube("a_whole_new_cube"); assertEquals(0, createdCube.getSegments().size()); assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus()); - createdCube.setStatus(RealizationStatusEnum.DESCBROKEN); - CubeUpdate cubeBuilder = new CubeUpdate(createdCube); - cubeManager.updateCube(cubeBuilder); - assertEquals(RealizationStatusEnum.DESCBROKEN, cubeManager.getCube("a_whole_new_cube").getStatus()); + cubeManager.updateCubeStatus(createdCube, RealizationStatusEnum.READY); + + assertEquals(RealizationStatusEnum.READY, cubeManager.getCube("a_whole_new_cube").getStatus()); + } + + @Test + public void testCachedAndSharedFlag() { + CubeInstance cube = cubeManager.getCube("test_kylin_cube_with_slr_empty"); + assertEquals(true, cube.isCachedAndShared()); + assertEquals(false, cube.latestCopyForWrite().isCachedAndShared()); + + try { + new CubeUpdate(cube); + fail(); + } catch (IllegalArgumentException ex) { + // update cached object is illegal + } } public CubeDescManager getCubeDescManager() { http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java new file mode 100644 index 0000000..58f29b2 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class CubeManagerConcurrencyTest extends LocalFileMetadataTestCase { + + private static final Logger logger = LoggerFactory.getLogger(CubeManagerConcurrencyTest.class); + + @Before + public void setUp() throws Exception { + System.setProperty("kylin.cube.max-building-segments", "10000"); + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + System.clearProperty("kylin.cube.max-building-segments"); + } + + @Test + public void test() throws Exception { + final KylinConfig config = getTestConfig(); + CubeManager cubeMgr = CubeManager.getInstance(config); + CubeDescManager cubeDescMgr = CubeDescManager.getInstance(config); + + // 4 empty new cubes to start with + final String[] cubeNames = { "c1", "c2", "c3", "c4" }; + final int n = cubeNames.length; + final int updatesPerCube = 100; + final CubeDesc desc = cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc"); + final List<CubeInstance> cubes = new ArrayList<>(); + for (String name : cubeNames) { + cubes.add(cubeMgr.createCube(name, ProjectInstance.DEFAULT_PROJECT_NAME, desc, null)); + } + + final AtomicInteger runningFlag = new AtomicInteger(); + final Vector<Exception> exceptions = new Vector<>(); + + // 1 thread, keeps reloading cubes + Thread reloadThread = new Thread() { + @Override + public void run() { + try { + Random rand = new Random(); + while (runningFlag.get() == 0) { + String name = cubeNames[rand.nextInt(n)]; + CubeManager.getInstance(config).reloadCube(name); + Thread.sleep(1); + } + } catch (Exception ex) { + logger.error("reload thread error", ex); + exceptions.add(ex); + } + } + }; + reloadThread.start(); + + // 4 threads, keeps updating cubes + Thread[] updateThreads = new Thread[n]; + for (int i = 0; i < n; i++) { + // each thread takes care of one cube + // for now, the design refuses concurrent updates to one cube + final String cubeName = cubeNames[i]; + updateThreads[i] = new Thread() { + @Override + public void run() { + try { + Random rand = new Random(); + for (int i = 0; i < updatesPerCube; i++) { + CubeManager mgr = CubeManager.getInstance(config); + CubeInstance cube = mgr.getCube(cubeName); + mgr.appendSegment(cube, new TSRange((long) i, (long) i + 1)); + Thread.sleep(rand.nextInt(1)); + } + } catch (Exception ex) { + logger.error("update thread ", ex); + exceptions.add(ex); + } + } + }; + updateThreads[i].start(); + } + + // wait things done + for (int i = 0; i < n; i++) { + updateThreads[i].join(); + } + runningFlag.incrementAndGet(); + reloadThread.join(); + + // check result and error + if (exceptions.isEmpty() == false) { + logger.error(exceptions.size() + " exceptions encountered, see logs above"); + fail(); + } + for (int i = 0; i < n; i++) { + CubeInstance cube = cubeMgr.getCube(cubeNames[i]); + assertEquals(updatesPerCube, cube.getSegments().size()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index d8d48f7..f293472 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -42,7 +42,6 @@ import org.junit.Test; import com.google.common.collect.Maps; /** - * @author yangli9 */ public class CubeManagerTest extends LocalFileMetadataTestCase { @@ -81,13 +80,14 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { CubeDescManager cubeDescMgr = getCubeDescManager(); CubeDesc desc = cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc"); - CubeInstance createdCube = cubeMgr.createCube("a_whole_new_cube", ProjectInstance.DEFAULT_PROJECT_NAME, desc, null); - assertTrue(createdCube == cubeMgr.getCube("a_whole_new_cube")); + CubeInstance createdCube = cubeMgr.createCube("a_whole_new_cube", ProjectInstance.DEFAULT_PROJECT_NAME, desc, + null); + assertTrue(createdCube.equals(cubeMgr.getCube("a_whole_new_cube"))); assertTrue(prjMgr.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME).contains(createdCube)); CubeInstance droppedCube = CubeManager.getInstance(getTestConfig()).dropCube("a_whole_new_cube", false); - assertTrue(createdCube == droppedCube); + assertTrue(createdCube.equals(droppedCube)); assertTrue(!prjMgr.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME).contains(droppedCube)); @@ -97,7 +97,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { @Test public void testAutoMergeNormal() throws Exception { CubeManager mgr = CubeManager.getInstance(getTestConfig()); - CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite(); cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 }); mgr.updateCube(new CubeUpdate(cube)); @@ -109,15 +109,12 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { // append first CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L), null, null, null); - seg1.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY); CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L), null, null, null); - seg2.setStatus(SegmentStatusEnum.READY); - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - - mgr.updateCube(cubeBuilder); + mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY); + cube = mgr.getCube(cube.getName()); assertEquals(2, cube.getSegments().size()); SegmentRange mergedSeg = cube.autoMergeCubeSegments(); @@ -126,95 +123,92 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { } - @Test public void testConcurrentBuildAndMerge() throws Exception { CubeManager mgr = CubeManager.getInstance(getTestConfig()); - CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite(); + System.setProperty("kylin.cube.max-building-segments", "10"); // no segment at first assertEquals(0, cube.getSegments().size()); - Map m1 = Maps.newHashMap(); + Map m1 = Maps.newHashMap(); m1.put(1, 1000L); - Map m2 = Maps.newHashMap(); + Map m2 = Maps.newHashMap(); m2.put(1, 2000L); - Map m3 = Maps.newHashMap(); + Map m3 = Maps.newHashMap(); m3.put(1, 3000L); - Map m4 = Maps.newHashMap(); + Map m4 = Maps.newHashMap(); m4.put(1, 4000L); // append first CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 1000L), null, m1); - seg1.setStatus(SegmentStatusEnum.READY); - + mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY); CubeSegment seg2 = mgr.appendSegment(cube, null, new SegmentRange(1000L, 2000L), m1, m2); - seg2.setStatus(SegmentStatusEnum.READY); - + mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY); CubeSegment seg3 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 2000L), true); - seg3.setStatus(SegmentStatusEnum.NEW); - + //seg3.setStatus(SegmentStatusEnum.NEW); CubeSegment seg4 = mgr.appendSegment(cube, null, new SegmentRange(2000L, 3000L), m2, m3); seg4.setStatus(SegmentStatusEnum.NEW); seg4.setLastBuildJobID("test"); seg4.setStorageLocationIdentifier("test"); - + CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()); + update.setToUpdateSegs(seg4); + mgr.updateCube(update); CubeSegment seg5 = mgr.appendSegment(cube, null, new SegmentRange(3000L, 4000L), m3, m4); - seg5.setStatus(SegmentStatusEnum.READY); - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - - mgr.updateCube(cubeBuilder); - + mgr.updateCubeSegStatus(seg5, SegmentStatusEnum.READY); mgr.promoteNewlyBuiltSegments(cube, seg4); + cube = mgr.getCube(cube.getName()); assertTrue(cube.getSegments().size() == 5); - assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW); - assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg1.getUuid()) != null + && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg2.getUuid()) != null + && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg3.getUuid()) != null + && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW); + assertTrue(cube.getSegmentById(seg4.getUuid()) != null + && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg5.getUuid()) != null + && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY); } - @Test public void testConcurrentMergeAndMerge() throws Exception { System.setProperty("kylin.cube.max-building-segments", "10"); CubeManager mgr = CubeManager.getInstance(getTestConfig()); - CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite(); // no segment at first assertEquals(0, cube.getSegments().size()); - Map m1 = Maps.newHashMap(); + Map m1 = Maps.newHashMap(); m1.put(1, 1000L); - Map m2 = Maps.newHashMap(); + Map m2 = Maps.newHashMap(); m2.put(1, 2000L); - Map m3 = Maps.newHashMap(); + Map m3 = Maps.newHashMap(); m3.put(1, 3000L); - Map m4 = Maps.newHashMap(); + Map m4 = Maps.newHashMap(); m4.put(1, 4000L); // append first CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 1000L), null, m1); - seg1.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY); CubeSegment seg2 = mgr.appendSegment(cube, null, new SegmentRange(1000L, 2000L), m1, m2); - seg2.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY); CubeSegment seg3 = mgr.appendSegment(cube, null, new SegmentRange(2000L, 3000L), m2, m3); - seg3.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY); CubeSegment seg4 = mgr.appendSegment(cube, null, new SegmentRange(3000L, 4000L), m3, m4); - seg4.setStatus(SegmentStatusEnum.READY); - - + mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY); CubeSegment merge1 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 2000L), true); merge1.setStatus(SegmentStatusEnum.NEW); @@ -226,20 +220,25 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { merge2.setLastBuildJobID("test"); merge2.setStorageLocationIdentifier("test"); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - mgr.updateCube(cubeBuilder); - + CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()); + update.setToUpdateSegs(merge1, merge2); + mgr.updateCube(update); mgr.promoteNewlyBuiltSegments(cube, merge1); + cube = mgr.getCube(cube.getName()); assertTrue(cube.getSegments().size() == 4); assertTrue(cube.getSegmentById(seg1.getUuid()) == null); assertTrue(cube.getSegmentById(seg2.getUuid()) == null); - assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); - assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW); + assertTrue(cube.getSegmentById(merge1.getUuid()) != null + && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg3.getUuid()) != null + && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg4.getUuid()) != null + && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(merge2.getUuid()) != null + && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW); } @@ -249,14 +248,15 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { final NavigableSet<String> cubePath = store.listResources(ResourceStore.CUBE_RESOURCE_ROOT); assertTrue(cubePath.size() > 1); - final List<CubeInstance> cubes = store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, CubeManager.CUBE_SERIALIZER); + final List<CubeInstance> cubes = store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, + CubeManager.CUBE_SERIALIZER); assertEquals(cubePath.size(), cubes.size()); } @Test public void testAutoMergeWithGap() throws Exception { CubeManager mgr = CubeManager.getInstance(getTestConfig()); - CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite(); cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 }); mgr.updateCube(new CubeUpdate(cube)); @@ -268,11 +268,12 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { // append first CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); - seg1.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY); CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L)); - seg3.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY); + cube = mgr.getCube(cube.getName()); assertEquals(2, cube.getSegments().size()); SegmentRange mergedSeg = cube.autoMergeCubeSegments(); @@ -282,8 +283,9 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { // append a new seg which will be merged CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L)); - seg4.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY); + cube = mgr.getCube(cube.getName()); assertEquals(3, cube.getSegments().size()); mergedSeg = cube.autoMergeCubeSegments(); @@ -294,8 +296,9 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { // fill the gap CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); - seg2.setStatus(SegmentStatusEnum.READY); + mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY); + cube = mgr.getCube(cube.getName()); assertEquals(4, cube.getSegments().size()); mergedSeg = cube.autoMergeCubeSegments(); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java index 64c6d68..c4dcb59 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java @@ -56,7 +56,10 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { CubeSegment seg = mgr.appendSegment(cube); assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getTSRange()); assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getSegRange()); - assertEquals(1, cube.getSegments().size()); + + assertEquals(0, cube.getSegments().size()); // older cube not changed + cube = mgr.getCube(cube.getName()); + assertEquals(1, cube.getSegments().size()); // the updated cube // second append, throw IllegalStateException because the first segment is not built try { @@ -84,7 +87,10 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { CubeSegment seg2 = mgr.appendSegment(cube); assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getTSRange()); assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getSegRange()); - assertEquals(2, cube.getSegments().size()); + + assertEquals(1, cube.getSegments().size()); // older cube not changed + cube = mgr.getCube(cube.getName()); + assertEquals(2, cube.getSegments().size()); // the updated cube // non-partitioned cannot merge, throw exception try { @@ -105,32 +111,35 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // append first CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); - seg1.setStatus(SegmentStatusEnum.READY); + cube = readySegment(cube, seg1); // append second CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); + cube = readySegment(cube, seg2); assertEquals(2, cube.getSegments().size()); assertEquals(new TSRange(1000L, 2000L), seg2.getTSRange()); assertEquals(new TSRange(1000L, 2000L), seg2.getSegRange()); - assertEquals(SegmentStatusEnum.NEW, seg2.getStatus()); - seg2.setStatus(SegmentStatusEnum.READY); + assertEquals(SegmentStatusEnum.NEW, seg2.getStatus()); // older version of seg2 + assertEquals(SegmentStatusEnum.READY, cube.getSegments().get(1).getStatus()); // newer version of seg2 // merge first and second CubeSegment merge = mgr.mergeSegments(cube, new TSRange(0L, 2000L), null, true); + assertEquals(2, cube.getSegments().size()); // older version of cube + cube = mgr.getCube(cube.getName()); // get the newer version of cube assertEquals(3, cube.getSegments().size()); assertEquals(new TSRange(0L, 2000L), merge.getTSRange()); assertEquals(new TSRange(0L, 2000L), merge.getSegRange()); assertEquals(SegmentStatusEnum.NEW, merge.getStatus()); // segments are strictly ordered - assertEquals(seg1, cube.getSegments().get(0)); - assertEquals(merge, cube.getSegments().get(1)); - assertEquals(seg2, cube.getSegments().get(2)); + assertEquals(seg1.getUuid(), cube.getSegments().get(0).getUuid()); + assertEquals(merge.getUuid(), cube.getSegments().get(1).getUuid()); + assertEquals(seg2.getUuid(), cube.getSegments().get(2).getUuid()); // drop the merge - cube.getSegments().remove(merge); + cube = mgr.updateCubeDropSegments(cube, merge); // try merge at start/end at middle of segments try { @@ -141,6 +150,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { } CubeSegment merge2 = mgr.mergeSegments(cube, new TSRange(0L, 2500L), null, true); + cube = mgr.getCube(cube.getName()); // get the newer version of cube assertEquals(3, cube.getSegments().size()); assertEquals(new TSRange(0L, 2000L), merge2.getTSRange()); assertEquals(new TSRange(0L, 2000L), merge2.getSegRange()); @@ -157,12 +167,12 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // append the first CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); - seg1.setStatus(SegmentStatusEnum.READY); + cube = readySegment(cube, seg1); assertEquals(1, cube.getSegments().size()); // append the third CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 3000L)); - seg3.setStatus(SegmentStatusEnum.READY); + cube = readySegment(cube, seg3); assertEquals(2, cube.getSegments().size()); // reject overlap @@ -175,9 +185,13 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // append the second CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); - seg2.setStatus(SegmentStatusEnum.READY); + cube = readySegment(cube, seg2); assertEquals(3, cube.getSegments().size()); } + + private CubeInstance readySegment(CubeInstance cube, CubeSegment seg) throws IOException { + return mgr().updateCubeSegStatus(seg, SegmentStatusEnum.READY); + } private CubeManager mgr() { return CubeManager.getInstance(getTestConfig()); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java index c99b683..d5b421f 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java @@ -79,7 +79,7 @@ public class ProjectManagerTest extends LocalFileMetadataTestCase { CubeDesc desc = cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc"); CubeInstance createdCube = cubeMgr.createCube("cube_in_alien_project", "alien", desc, null); - assertTrue(createdCube == cubeMgr.getCube("cube_in_alien_project")); + assertTrue(createdCube.equals(cubeMgr.getCube("cube_in_alien_project"))); ProjectManager proMgr = ProjectManager.getInstance(getTestConfig()); Set<IRealization> realizations = proMgr.listAllRealizations("alien"); assertTrue(realizations.contains(createdCube)); @@ -102,7 +102,7 @@ public class ProjectManagerTest extends LocalFileMetadataTestCase { CubeInstance droppedCube = cubeMgr.dropCube("cube_in_alien_project", true); - assertTrue(createdCube == droppedCube); + assertTrue(createdCube.equals(droppedCube)); assertNull(cubeMgr.getCube("cube_in_alien_project")); assertTrue(prjMgr.listAllProjects().size() == originalProjectCount + 1); assertTrue(cubeMgr.listAllCubes().size() == originalCubeCount); @@ -126,7 +126,7 @@ public class ProjectManagerTest extends LocalFileMetadataTestCase { CubeDesc desc = cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc"); CubeInstance createdCube = cubeMgr.createCube("new_cube_in_default", ProjectInstance.DEFAULT_PROJECT_NAME, desc, null); - assertTrue(createdCube == cubeMgr.getCube("new_cube_in_default")); + assertTrue(createdCube.equals(cubeMgr.getCube("new_cube_in_default"))); //System.out.println(JsonUtil.writeValueAsIndentString(createdCube)); @@ -135,7 +135,7 @@ public class ProjectManagerTest extends LocalFileMetadataTestCase { CubeInstance droppedCube = cubeMgr.dropCube("new_cube_in_default", true); - assertTrue(createdCube == droppedCube); + assertTrue(createdCube.equals(droppedCube)); assertNull(cubeMgr.getCube("new_cube_in_default")); assertTrue(prjMgr.listAllProjects().size() == originalProjectCount); assertTrue(cubeMgr.listAllCubes().size() == originalCubeCount); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java index ccc0a4a..c7244fe 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java @@ -18,6 +18,10 @@ package org.apache.kylin.metadata.cachesync; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.List; @@ -69,10 +73,37 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { return (Serializer<DataModelDesc>) serializer; } - public void setCheckOnWrite(boolean check) { + public void setCheckCopyOnWrite(boolean check) { this.checkCopyOnWrite = check; } + // Make copy of an entity such that update can apply on the copy. + // Note cached and shared object MUST NOT be updated directly. + public T copyForWrite(T entity) { + if (entity.isCachedAndShared() == false) + return entity; + + T copy; + try { + byte[] bytes; + try (ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(buf)) { + serializer.serialize(entity, dout); + bytes = buf.toByteArray(); + } + + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes))) { + copy = serializer.deserialize(in); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + copy.setCachedAndShared(false); + initEntityAfterReload(copy, entity.resourceName()); + return copy; + } + private String resourcePath(String resourceName) { return resRootPath + "/" + resourceName + resPathSuffix; } @@ -80,12 +111,11 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { private String resourceName(String resourcePath) { Preconditions.checkArgument(resourcePath.startsWith(resRootPath)); Preconditions.checkArgument(resourcePath.endsWith(resPathSuffix)); - return resourcePath.substring(resRootPath.length() + 1, - resourcePath.length() - resPathSuffix.length()); + return resourcePath.substring(resRootPath.length() + 1, resourcePath.length() - resPathSuffix.length()); } public void reloadAll() throws IOException { - logger.debug("Reloading " + entityType.getName() + " from " + store.getReadableResourcePath(resRootPath)); + logger.debug("Reloading " + entityType.getSimpleName() + " from " + store.getReadableResourcePath(resRootPath)); cache.clear(); @@ -94,8 +124,8 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { reloadQuietlyAt(path); } - logger.debug( - "Loaded " + cache.size() + " " + entityType.getName() + "(s) out of " + paths.size() + " resource"); + logger.debug("Loaded " + cache.size() + " " + entityType.getSimpleName() + "(s) out of " + paths.size() + + " resource"); } public T reload(String resourceName) { @@ -110,7 +140,7 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { try { return reloadAt(path); } catch (Exception ex) { - logger.error("Error loading " + entityType.getName() + " at " + path, ex); + logger.error("Error loading " + entityType.getSimpleName() + " at " + path, ex); return null; } } @@ -119,21 +149,23 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { try { T entity = store.getResource(path, entityType, serializer); if (entity == null) { - logger.warn("No " + entityType.getName() + " found at " + path + ", returning null"); + logger.warn("No " + entityType.getSimpleName() + " found at " + path + ", returning null"); cache.removeLocal(resourceName(path)); return null; } + // mark cached object + entity.setCachedAndShared(true); entity = initEntityAfterReload(entity, resourceName(path)); if (path.equals(resourcePath(entity.resourceName())) == false) throw new IllegalStateException("The entity " + entity + " read from " + path + " will save to a different path " + resourcePath(entity.resourceName())); - + cache.putLocal(entity.resourceName(), entity); return entity; } catch (Exception e) { - throw new IllegalStateException("Error loading " + entityType.getName() + " at " + path, e); + throw new IllegalStateException("Error loading " + entityType.getSimpleName() + " at " + path, e); } } @@ -148,18 +180,23 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { Preconditions.checkArgument(resName != null && resName.length() > 0); if (checkCopyOnWrite) { - if (cache.get(resName) == entity) { + if (entity.isCachedAndShared() || cache.get(resName) == entity) { throw new IllegalStateException("Copy-on-write violation! The updating entity " + entity - + " is a shared object in " + entityType.getName() + " cache, which should not be."); + + " is a shared object in " + entityType.getSimpleName() + " cache, which should not be."); } } String path = resourcePath(resName); - logger.debug("Saving {} at {}", entityType.getName(), path); + logger.debug("Saving {} at {}", entityType.getSimpleName(), path); store.putResource(path, entity, serializer); + + // just to trigger the event broadcast, the entity won't stay in cache cache.put(resName, entity); - return entity; + + // keep the pass-in entity out of cache, the caller may use it for further update + // return a reloaded new object + return reload(resName); } public void delete(T entity) throws IOException { @@ -170,7 +207,7 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> { Preconditions.checkArgument(resName != null); String path = resourcePath(resName); - logger.debug("Deleting {} at {}", entityType.getName(), path); + logger.debug("Deleting {} at {}", entityType.getSimpleName(), path); store.deleteResource(path); cache.remove(resName); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index b6dbd5d..cf1b94a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; import java.util.Map; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; @@ -41,12 +42,16 @@ public class CuboidShardUtil { filtered.put(entry.getKey(), entry.getValue()); } } + + // work on copy instead of cached objects + CubeInstance cubeCopy = segment.getCubeInstance().latestCopyForWrite(); + CubeSegment segCopy = cubeCopy.getSegmentById(segment.getUuid()); - segment.setCuboidShardNums(filtered); - segment.setTotalShards(totalShards); + segCopy.setCuboidShardNums(filtered); + segCopy.setTotalShards(totalShards); - CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance()); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + cubeManager.updateCube(update); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java index 9c805a8..0f7281f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -107,8 +107,8 @@ public class StatisticsDecisionUtil { return; } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setCuboids(recommendCuboidsWithStats); - CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()); + update.setCuboids(recommendCuboidsWithStats); + CubeManager.getInstance(cube.getConfig()).updateCube(update); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index 58b2c02..2a184b0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -63,12 +63,16 @@ public class MergeDictionaryStep extends AbstractExecutable { try { checkLookupSnapshotsMustIncremental(mergingSegments); - makeDictForNewSegment(conf, cube, newSegment, mergingSegments); - makeSnapshotForNewSegment(cube, newSegment, mergingSegments); - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(newSegment); - mgr.updateCube(cubeBuilder); + // work on copy instead of cached objects + CubeInstance cubeCopy = cube.latestCopyForWrite(); + CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid()); + + makeDictForNewSegment(conf, cubeCopy, newSegCopy, mergingSegments); + makeSnapshotForNewSegment(cubeCopy, newSegCopy, mergingSegments); + + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(newSegCopy); + mgr.updateCube(update); return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } catch (IOException e) { logger.error("fail to merge dictionary or lookup snapshots", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java deleted file mode 100644 index 5ddb024..0000000 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mrunit.mapreduce.MapDriver; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.SegmentRange.TSRange; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.IReadableTable.TableSignature; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("rawtypes") -public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { - - private static final Logger logger = LoggerFactory.getLogger(MergeCuboidMapperTest.class); - - MapDriver<Text, Text, Text, Text> mapDriver; - CubeManager cubeManager; - CubeInstance cube; - DictionaryManager dictionaryManager; - - TblColRef lfn; - TblColRef lsi; - TblColRef ssc; - - private DictionaryInfo makeSharedDict() throws IOException { - TableSignature signature = new TableSignature(); - signature.setSize(100); - signature.setLastModifiedTime(System.currentTimeMillis()); - signature.setPath("fake_common_dict"); - - DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature); - - List<String> values = new ArrayList<>(); - values.add("eee"); - values.add("fff"); - Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); - dictionaryManager.trySaveNewDict(dict, newDictInfo); - dict.dump(System.out); - - return newDictInfo; - } - - @Before - public void setUp() throws Exception { - - createTestMetadata(); - - logger.info("The metadataUrl is : " + getTestConfig()); - getTestConfig().clearManagers(); - - // hack for distributed cache - // CubeManager.removeInstance(KylinConfig.createInstanceFromUri("../job/meta"));//to - // make sure the following mapper could get latest CubeManger - FileUtils.deleteDirectory(new File("../job/meta")); - - MergeCuboidMapper mapper = new MergeCuboidMapper(); - mapDriver = MapDriver.newMapDriver(mapper); - - cubeManager = CubeManager.getInstance(getTestConfig()); - cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_ready_2_segments"); - dictionaryManager = DictionaryManager.getInstance(getTestConfig()); - lfn = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME"); - lsi = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT"); - ssc = cube.getDescriptor().findColumnRef("DEFAULT.TEST_CATEGORY_GROUPINGS", "META_CATEG_NAME"); - - DictionaryInfo sharedDict = makeSharedDict(); - - boolean isFirstSegment = true; - for (CubeSegment segment : cube.getSegments()) { - - TableSignature signature = new TableSignature(); - signature.setSize(100); - signature.setLastModifiedTime(System.currentTimeMillis()); - signature.setPath("fake_dict_for" + lfn.getName() + segment.getName()); - - DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getColumnDesc(), "string", signature); - - List<String> values = new ArrayList<>(); - values.add("aaa"); - if (isFirstSegment) - values.add("ccc"); - else - values.add("bbb"); - Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); - dictionaryManager.trySaveNewDict(dict, newDictInfo); - dict.dump(System.out); - - segment.putDictResPath(lfn, newDictInfo.getResourcePath()); - segment.putDictResPath(lsi, sharedDict.getResourcePath()); - segment.putDictResPath(ssc, sharedDict.getResourcePath()); - - // cubeManager.saveResource(segment.getCubeInstance()); - // cubeManager.afterCubeUpdated(segment.getCubeInstance()); - - isFirstSegment = false; - } - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cube = cubeManager.updateCube(cubeBuilder); - - } - - @After - public void after() throws Exception { - cleanupTestMetadata(); - FileUtils.deleteDirectory(new File("../job/meta")); - } - - @Test - public void test() throws IOException, ParseException { - - // String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments"; - - CubeSegment newSeg = cubeManager.mergeSegments(cube, new TSRange(0L, Long.MAX_VALUE), null, false); - // String segmentName = newSeg.getName(); - - final Dictionary<String> dictionary = cubeManager.getDictionary(newSeg, lfn); - assertTrue(dictionary == null); - // ((TrieDictionary) dictionary).dump(System.out); - - // hack for distributed cache - // File metaDir = new File("../job/meta"); - // FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir); - // - // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - // // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL, - // // "../job/meta"); - // - // byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 }; - // byte[] value = new byte[] { 1, 2, 3 }; - // byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 }; - // byte[] newvalue = new byte[] { 1, 2, 3 }; - // - // mapDriver.withInput(new Text(key), new Text(value)); - // mapDriver.withOutput(new Text(newkey), new Text(newvalue)); - // mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid")); - // - // mapDriver.runTest(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 872deed..459d3aa 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -238,11 +238,17 @@ public class SparkCubing extends AbstractApplication { }))); } final long end = System.currentTimeMillis(); - CubingUtils.writeDictionary(seg, dictionaryMap, start, end); + + // work on copy instead of cached objects + CubeInstance cubeCopy = cubeInstance.latestCopyForWrite(); + CubeSegment segCopy = cubeCopy.getSegmentById(seg.getUuid()); + + CubingUtils.writeDictionary(segCopy, dictionaryMap, start, end); + try { - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToUpdateSegs(seg); - cubeManager.updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + cubeManager.updateCube(update); } catch (IOException e) { throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); } @@ -501,8 +507,8 @@ public class SparkCubing extends AbstractApplication { private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeInstance cubeCopy = CubeManager.getInstance(kylinConfig).getCube(cubeName).latestCopyForWrite(); + final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId); final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); FsShell shell = new FsShell(hbaseConf); @@ -515,18 +521,17 @@ public class SparkCubing extends AbstractApplication { String[] newArgs = new String[2]; newArgs[0] = hfileLocation; - newArgs[1] = cubeSegment.getStorageLocationIdentifier(); + newArgs[1] = segCopy.getStorageLocationIdentifier(); int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs); System.out.println("incremental load result:" + ret); - cubeSegment.setStatus(SegmentStatusEnum.READY); try { - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeInstance.setStatus(RealizationStatusEnum.READY); - cubeSegment.setStatus(SegmentStatusEnum.READY); - cubeBuilder.setToUpdateSegs(cubeSegment); - CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + cubeCopy.setStatus(RealizationStatusEnum.READY); + segCopy.setStatus(SegmentStatusEnum.READY); + update.setToUpdateSegs(segCopy); + CubeManager.getInstance(kylinConfig).updateCube(update); } catch (IOException e) { throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 42fc124..18a07cf 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -44,7 +44,6 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; @@ -328,10 +327,7 @@ public class BuildCubeWithEngine { private void clearSegment(String cubeName) throws Exception { CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); + cubeManager.updateCubeDropSegments(cube, cube.getSegments()); } private Boolean mergeSegment(String cubeName, long startDate, long endDate) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index d338332..bf52b97 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -47,7 +47,6 @@ import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; @@ -160,10 +159,7 @@ public class BuildCubeWithStream { protected void clearSegment(String cubeName) throws Exception { CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); + cubeManager.updateCubeDropSegments(cube, cube.getSegments()); } public void build() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index f04f825..a02715e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -176,9 +176,8 @@ public class CubeService extends BasicService implements InitializingBean { String owner = SecurityContextHolder.getContext().getAuthentication().getName(); cube.setOwner(owner); - CubeUpdate cubeBuilder = new CubeUpdate(cube).setOwner(owner).setCost(cost); - - return getCubeManager().updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()).setOwner(owner).setCost(cost); + return getCubeManager().updateCube(update); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN @@ -359,16 +358,7 @@ public class CubeService extends BasicService implements InitializingBean { throw new BadRequestException(String.format(msg.getDISABLE_NOT_READY_CUBE(), cubeName, ostatus)); } - cube.setStatus(RealizationStatusEnum.DISABLED); - - try { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setStatus(RealizationStatusEnum.DISABLED); - return getCubeManager().updateCube(cubeBuilder); - } catch (IOException e) { - cube.setStatus(ostatus); - throw e; - } + return getCubeManager().updateCubeStatus(cube, RealizationStatusEnum.DISABLED); } public void checkEnableCubeCondition(CubeInstance cube) { @@ -399,15 +389,7 @@ public class CubeService extends BasicService implements InitializingBean { * @throws IOException */ public CubeInstance enableCube(CubeInstance cube) throws IOException { - RealizationStatusEnum ostatus = cube.getStatus(); - try { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setStatus(RealizationStatusEnum.READY); - return getCubeManager().updateCube(cubeBuilder); - } catch (IOException e) { - cube.setStatus(ostatus); - throw e; - } + return getCubeManager().updateCubeStatus(cube, RealizationStatusEnum.READY); } public MetricsResponse calculateMetrics(MetricsRequest request) { @@ -509,9 +491,7 @@ public class CubeService extends BasicService implements InitializingBean { throw new BadRequestException(String.format(msg.getDELETE_NOT_READY_SEG(), segmentName)); } - CubeUpdate update = new CubeUpdate(cube); - update.setToRemoveSegs(new CubeSegment[] { toDelete }); - return CubeManager.getInstance(getConfig()).updateCube(update); + return CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete); } protected void releaseAllJobs(CubeInstance cube) { @@ -532,10 +512,7 @@ public class CubeService extends BasicService implements InitializingBean { */ private void releaseAllSegments(CubeInstance cube) throws IOException { releaseAllJobs(cube); - - CubeUpdate update = new CubeUpdate(cube); - update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - CubeManager.getInstance(getConfig()).updateCube(update); + CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, cube.getSegments()); } public void updateOnNewSegmentReady(String cubeName) { @@ -579,10 +556,8 @@ public class CubeService extends BasicService implements InitializingBean { } if (toRemoveSegs.size() > 0) { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])); try { - this.getCubeManager().updateCube(cubeBuilder); + getCubeManager().updateCubeDropSegments(cube, toRemoveSegs); } catch (IOException e) { logger.error("Failed to remove old segment from cube " + cubeName, e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 5648c08..1dfa1de 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -37,7 +37,6 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; @@ -256,10 +255,8 @@ public class JobService extends BasicService implements InitializingBean { logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube", newSeg.getName()); try { - // Remove this segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(newSeg); - getCubeManager().updateCube(cubeBuilder); + // Remove this segment + getCubeManager().updateCubeDropSegments(cube, newSeg); } catch (Exception ee) { // swallow the exception logger.error("Clean New segment failed, ignoring it", e); @@ -347,10 +344,8 @@ public class JobService extends BasicService implements InitializingBean { for (String segmentId : StringUtils.split(segmentIds)) { final CubeSegment segment = cubeInstance.getSegmentById(segmentId); if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) { - // Remove this segments - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToRemoveSegs(segment); - getCubeManager().updateCube(cubeBuilder); + // Remove this segment + getCubeManager().updateCubeDropSegments(cubeInstance, segment); } } getExecutableManager().discardJob(job.getId());