KYLIN-3085 Makes sure no update on cached and shared CubeInstance

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/03e6b8c5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/03e6b8c5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/03e6b8c5

Branch: refs/heads/master
Commit: 03e6b8c5d622125aca36128c6a7942f711951768
Parents: c094265
Author: Li Yang <liy...@apache.org>
Authored: Sun Dec 10 10:44:12 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Thu Dec 14 16:53:10 2017 +0800

----------------------------------------------------------------------
 .../persistence/RootPersistentEntity.java       |  11 +
 .../org/apache/kylin/cube/CubeInstance.java     |   6 +
 .../java/org/apache/kylin/cube/CubeManager.java | 203 +++++++++++++------
 .../java/org/apache/kylin/cube/CubeUpdate.java  |   5 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |   2 +
 .../apache/kylin/cube/CubeManagerCacheTest.java |  22 +-
 .../kylin/cube/CubeManagerConcurrencyTest.java  | 141 +++++++++++++
 .../org/apache/kylin/cube/CubeManagerTest.java  | 123 +++++------
 .../org/apache/kylin/cube/CubeSegmentsTest.java |  38 ++--
 .../kylin/cube/project/ProjectManagerTest.java  |   8 +-
 .../metadata/cachesync/CachedCrudAssist.java    |  67 ++++--
 .../kylin/engine/mr/common/CuboidShardUtil.java |  15 +-
 .../mr/common/StatisticsDecisionUtil.java       |   6 +-
 .../engine/mr/steps/MergeDictionaryStep.java    |  16 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  | 183 -----------------
 .../apache/kylin/engine/spark/SparkCubing.java  |  31 +--
 .../kylin/provision/BuildCubeWithEngine.java    |   6 +-
 .../kylin/provision/BuildCubeWithStream.java    |   6 +-
 .../apache/kylin/rest/service/CubeService.java  |  39 +---
 .../apache/kylin/rest/service/JobService.java   |  13 +-
 .../kylin/source/kafka/job/MergeOffsetStep.java |  22 +-
 21 files changed, 528 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index aa35482..39c2995 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -58,6 +58,9 @@ abstract public class RootPersistentEntity implements 
AclEntity, Serializable {
 
     @JsonProperty("last_modified")
     protected long lastModified;
+    
+    // if cached and shared, the object MUST NOT be modified (call setXXX() 
for example)
+    protected boolean isCachedAndShared = false;
 
     /**
      * Metadata model version
@@ -99,6 +102,14 @@ abstract public class RootPersistentEntity implements 
AclEntity, Serializable {
         setUuid(UUID.randomUUID().toString());
     }
     
+    public boolean isCachedAndShared() {
+        return isCachedAndShared;
+    }
+
+    public void setCachedAndShared(boolean isCachedAndShared) {
+        this.isCachedAndShared = isCachedAndShared;
+    }
+
     /**
      * The name as a part of the resource path used to save the entity.
      * 

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 1be7923..55e9325 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -128,6 +128,12 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
     public CubeInstance() {
     }
     
+    public CubeInstance latestCopyForWrite() {
+        CubeManager mgr = CubeManager.getInstance(config);
+        CubeInstance latest = mgr.getCube(name); // in case this object is 
out-of-date
+        return mgr.copyForWrite(latest);
+    }
+    
     void init(KylinConfig config) {
         CubeDesc cubeDesc = 
CubeDescManager.getInstance(config).getCubeDesc(descName);
         checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not 
found", descName, name);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3220a0f..1813ad2 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -133,6 +134,7 @@ public class CubeManager implements IRealizationProvider {
                 return cube;
             }
         };
+        this.crud.setCheckCopyOnWrite(true);
 
         // touch lower level metadata before registering my listener
         crud.reloadAll();
@@ -243,6 +245,7 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    // try minimize the use of this method, use udpateCubeXXX() instead
     public CubeInstance updateCube(CubeUpdate update) throws IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = updateCubeWithRetry(update, 0);
@@ -250,6 +253,42 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public CubeInstance updateCubeStatus(CubeInstance cube, 
RealizationStatusEnum newStatus) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite(); // get a latest copy
+            CubeUpdate update = new CubeUpdate(cube);
+            update.setStatus(newStatus);
+            return updateCube(update);
+        }
+    }
+
+    public CubeInstance updateCubeDropSegments(CubeInstance cube, 
Collection<CubeSegment> segsToDrop)
+            throws IOException {
+        CubeSegment[] arr = (CubeSegment[]) segsToDrop.toArray(new 
CubeSegment[segsToDrop.size()]);
+        return updateCubeDropSegments(cube, arr);
+    }
+
+    public CubeInstance updateCubeDropSegments(CubeInstance cube, 
CubeSegment... segsToDrop) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite(); // get a latest copy
+            CubeUpdate update = new CubeUpdate(cube);
+            update.setToRemoveSegs(segsToDrop);
+            return updateCube(update);
+        }
+    }
+    
+    public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum 
status) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            CubeInstance cube = seg.getCubeInstance().latestCopyForWrite();
+            seg = cube.getSegmentById(seg.getUuid());
+            
+            CubeUpdate update = new CubeUpdate(cube);
+            seg.setStatus(status);
+            update.setToUpdateSegs(seg);
+            return updateCube(update);
+        }        
+    }
+
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) 
throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
@@ -319,9 +358,8 @@ public class CubeManager implements IRealizationProvider {
             }
 
             cube = crud.reload(cube.getName());
-            update.setCubeInstance(cube);
-            retry++;
-            cube = updateCubeWithRetry(update, retry);
+            update.setCubeInstance(cube.latestCopyForWrite());
+            return updateCubeWithRetry(update, ++retry);
         }
 
         if (toRemoveResources.size() > 0) {
@@ -337,10 +375,18 @@ public class CubeManager implements IRealizationProvider {
         //this is a duplicate call to take care of scenarios where REST cache 
service unavailable
         ProjectManager.getInstance(cube.getConfig()).clearL2Cache();
 
-        return cube;
+        return crud.reload(cube.resourceName());
     }
 
-    public CubeInstance reloadCubeQuietly(String cubeName) {
+    // for test
+    CubeInstance reloadCube(String cubeName) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            return crud.reload(cubeName);
+        }
+    }
+
+    // for internal
+    CubeInstance reloadCubeQuietly(String cubeName) {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = crud.reloadQuietly(cubeName);
             if (cube != null)
@@ -401,6 +447,10 @@ public class CubeManager implements IRealizationProvider {
         return tableName;
     }
 
+    public CubeInstance copyForWrite(CubeInstance cube) {
+        return crud.copyForWrite(cube);
+    }
+
     private boolean isReady(CubeSegment seg) {
         return seg.getStatus() == SegmentStatusEnum.READY;
     }
@@ -490,13 +540,15 @@ public class CubeManager implements IRealizationProvider {
         CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange,
                 Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, 
Long> sourcePartitionOffsetEnd)
                 throws IOException {
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest 
copy
+
             checkInputRanges(tsRange, segRange);
-            checkBuildingSegment(cube);
+            checkBuildingSegment(cubeCopy);
 
             // fix start/end a bit
-            if (cube.getModel().getPartitionDesc().isPartitioned()) {
+            if (cubeCopy.getModel().getPartitionDesc().isPartitioned()) {
                 // if missing start, set it to where last time ends
-                CubeSegment last = cube.getLastSegment();
+                CubeSegment last = cubeCopy.getLastSegment();
                 if (last != null && !last.isOffsetCube() && tsRange.start.v == 
0) {
                     tsRange = new TSRange(last.getTSRange().end.v, 
tsRange.end.v);
                 }
@@ -506,38 +558,40 @@ public class CubeManager implements IRealizationProvider {
                 segRange = null;
             }
 
-            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
             
newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
             newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
-            validateNewSegments(cube, newSegment);
+            validateNewSegments(cubeCopy, newSegment);
 
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToAddSegs(newSegment);
-            updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToAddSegs(newSegment);
+            updateCube(update);
             return newSegment;
         }
 
         public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange)
                 throws IOException {
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest 
copy
+
             checkInputRanges(tsRange, segRange);
-            checkBuildingSegment(cube);
+            checkBuildingSegment(cubeCopy);
 
-            if (cube.getModel().getPartitionDesc().isPartitioned() == false) {
+            if (cubeCopy.getModel().getPartitionDesc().isPartitioned() == 
false) {
                 // full build
                 tsRange = null;
                 segRange = null;
             }
 
-            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
 
-            Pair<Boolean, Boolean> pair = 
cube.getSegments().fitInSegments(newSegment);
+            Pair<Boolean, Boolean> pair = 
cubeCopy.getSegments().fitInSegments(newSegment);
             if (pair.getFirst() == false || pair.getSecond() == false)
                 throw new IllegalArgumentException("The new refreshing segment 
" + newSegment
-                        + " does not match any existing segment in cube " + 
cube);
+                        + " does not match any existing segment in cube " + 
cubeCopy);
 
             if (segRange != null) {
                 CubeSegment toRefreshSeg = null;
-                for (CubeSegment cubeSegment : cube.getSegments()) {
+                for (CubeSegment cubeSegment : cubeCopy.getSegments()) {
                     if (cubeSegment.getSegRange().equals(segRange)) {
                         toRefreshSeg = cubeSegment;
                         break;
@@ -553,30 +607,32 @@ public class CubeManager implements IRealizationProvider {
                 
newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
             }
 
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToAddSegs(newSegment);
-            updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToAddSegs(newSegment);
+            updateCube(update);
 
             return newSegment;
         }
 
         public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, boolean force)
                 throws IOException {
-            if (cube.getSegments().isEmpty())
-                throw new IllegalArgumentException("Cube " + cube + " has no 
segments");
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest 
copy
+
+            if (cubeCopy.getSegments().isEmpty())
+                throw new IllegalArgumentException("Cube " + cubeCopy + " has 
no segments");
 
             checkInputRanges(tsRange, segRange);
-            checkBuildingSegment(cube);
-            checkCubeIsPartitioned(cube);
+            checkBuildingSegment(cubeCopy);
+            checkCubeIsPartitioned(cubeCopy);
 
-            if (cube.getSegments().getFirstSegment().isOffsetCube()) {
+            if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) {
                 // offset cube, merge by date range?
                 if (segRange == null && tsRange != null) {
-                    Pair<CubeSegment, CubeSegment> pair = 
cube.getSegments(SegmentStatusEnum.READY)
+                    Pair<CubeSegment, CubeSegment> pair = 
cubeCopy.getSegments(SegmentStatusEnum.READY)
                             .findMergeOffsetsByDateRange(tsRange, 
Long.MAX_VALUE);
                     if (pair == null)
                         throw new IllegalArgumentException(
-                                "Find no segments to merge by " + tsRange + " 
for cube " + cube);
+                                "Find no segments to merge by " + tsRange + " 
for cube " + cubeCopy);
                     segRange = new 
SegmentRange(pair.getFirst().getSegRange().start,
                             pair.getSecond().getSegRange().end);
                 }
@@ -587,9 +643,9 @@ public class CubeManager implements IRealizationProvider {
                 Preconditions.checkArgument(tsRange != null);
             }
 
-            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
 
-            Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(newSegment);
+            Segments<CubeSegment> mergingSegments = 
cubeCopy.getMergingSegments(newSegment);
             if (mergingSegments.size() <= 1)
                 throw new IllegalArgumentException("Range " + 
newSegment.getSegRange()
                         + " must contain at least 2 segments, but there is " + 
mergingSegments.size());
@@ -628,11 +684,11 @@ public class CubeManager implements IRealizationProvider {
                 }
             }
 
-            validateNewSegments(cube, newSegment);
+            validateNewSegments(cubeCopy, newSegment);
 
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToAddSegs(newSegment);
-            updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToAddSegs(newSegment);
+            updateCube(update);
 
             return newSegment;
         }
@@ -684,39 +740,43 @@ public class CubeManager implements IRealizationProvider {
         }
 
         public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment 
newSegment) throws IOException {
-            if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest 
copy
+            CubeSegment newSegCopy = 
cubeCopy.getSegmentById(newSegment.getUuid());
+
+            if (StringUtils.isBlank(newSegCopy.getStorageLocationIdentifier()))
                 throw new IllegalStateException(
-                        "For cube " + cube + ", segment " + newSegment + " 
missing StorageLocationIdentifier");
+                        "For cube " + cubeCopy + ", segment " + newSegCopy + " 
missing StorageLocationIdentifier");
 
-            if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+            if (StringUtils.isBlank(newSegCopy.getLastBuildJobID()))
                 throw new IllegalStateException(
-                        "For cube " + cube + ", segment " + newSegment + " 
missing LastBuildJobID");
+                        "For cube " + cubeCopy + ", segment " + newSegCopy + " 
missing LastBuildJobID");
 
-            if (isReady(newSegment) == true) {
-                logger.warn("For cube " + cube + ", segment " + newSegment + " 
state should be NEW but is READY");
+            if (isReady(newSegCopy) == true) {
+                logger.warn("For cube " + cubeCopy + ", segment " + newSegCopy 
+ " state should be NEW but is READY");
             }
 
-            List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
+            List<CubeSegment> tobe = 
cubeCopy.calculateToBeSegments(newSegCopy);
 
-            if (tobe.contains(newSegment) == false)
-                throw new IllegalStateException(
-                        "For cube " + cube + ", segment " + newSegment + " is 
expected but not in the tobe " + tobe);
+            if (tobe.contains(newSegCopy) == false)
+                throw new IllegalStateException("For cube " + cubeCopy + ", 
segment " + newSegCopy
+                        + " is expected but not in the tobe " + tobe);
 
-            newSegment.setStatus(SegmentStatusEnum.READY);
+            newSegCopy.setStatus(SegmentStatusEnum.READY);
 
             List<CubeSegment> toRemoveSegs = Lists.newArrayList();
-            for (CubeSegment segment : cube.getSegments()) {
+            for (CubeSegment segment : cubeCopy.getSegments()) {
                 if (!tobe.contains(segment))
                     toRemoveSegs.add(segment);
             }
 
-            logger.info(
-                    "Promoting cube " + cube + ", new segment " + newSegment + 
", to remove segments " + toRemoveSegs);
+            logger.info("Promoting cube " + cubeCopy + ", new segment " + 
newSegCopy + ", to remove segments "
+                    + toRemoveSegs);
 
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[toRemoveSegs.size()]))
-                    
.setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
-            updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[toRemoveSegs.size()]))
+                    
.setToUpdateSegs(newSegCopy).setStatus(RealizationStatusEnum.READY);
+            updateCube(update);
         }
 
         public void validateNewSegments(CubeInstance cube, CubeSegment 
newSegments) {
@@ -833,15 +893,20 @@ public class CubeManager implements IRealizationProvider {
 
         private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, 
DictionaryInfo dictInfo)
                 throws IOException {
-            if (dictInfo != null) {
-                Dictionary<?> dict = dictInfo.getDictionaryObject();
-                cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-                cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), 
dict.getSize(), dict.getSizeOfId() });
-
-                CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
-                update.setToUpdateSegs(cubeSeg);
-                updateCube(update);
-            }
+            if (dictInfo == null)
+                return;
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = 
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+            Dictionary<?> dict = dictInfo.getDictionaryObject();
+            segCopy.putDictResPath(col, dictInfo.getResourcePath());
+            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), 
dict.getSize(), dict.getSizeOfId() });
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(segCopy);
+            updateCube(update);
         }
 
         /**
@@ -868,17 +933,21 @@ public class CubeManager implements IRealizationProvider {
         }
 
         public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String 
lookupTable) throws IOException {
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = 
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
             TableMetadataManager metaMgr = getTableManager();
             SnapshotManager snapshotMgr = getSnapshotManager();
 
-            TableDesc tableDesc = new 
TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject()));
+            TableDesc tableDesc = new 
TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
             IReadableTable hiveTable = 
SourceFactory.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc);
 
-            cubeSeg.putSnapshotResPath(lookupTable, 
snapshot.getResourcePath());
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
-            cubeBuilder.setToUpdateSegs(cubeSeg);
-            updateCube(cubeBuilder);
+            segCopy.putSnapshotResPath(lookupTable, 
snapshot.getResourcePath());
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(segCopy);
+            updateCube(update);
 
             return snapshot;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index fae20dc..1d4e722 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -36,7 +36,7 @@ public class CubeUpdate {
     private Map<Long, Long> cuboids = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
-        this.cubeInstance = cubeInstance;
+        setCubeInstance(cubeInstance);
     }
 
     public CubeInstance getCubeInstance() {
@@ -44,6 +44,9 @@ public class CubeUpdate {
     }
 
     public CubeUpdate setCubeInstance(CubeInstance cubeInstance) {
+        if (cubeInstance.isCachedAndShared())
+            throw new IllegalArgumentException();
+        
         this.cubeInstance = cubeInstance;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 65e8965..36c06b7 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -87,6 +87,8 @@ public class DictionaryGeneratorCLI {
             cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity);
         }
         
+        CubeInstance updatedCube = 
cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
+        cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
         for (TableRef lookup : toCheckLookup) {
             logger.info("Checking snapshot of " + lookup);
             JoinDesc join = 
cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 3881943..f913e1b 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
@@ -60,11 +61,24 @@ public class CubeManagerCacheTest extends 
LocalFileMetadataTestCase {
         CubeInstance createdCube = cubeManager.getCube("a_whole_new_cube");
         assertEquals(0, createdCube.getSegments().size());
         assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus());
-        createdCube.setStatus(RealizationStatusEnum.DESCBROKEN);
-        CubeUpdate cubeBuilder = new CubeUpdate(createdCube);
 
-        cubeManager.updateCube(cubeBuilder);
-        assertEquals(RealizationStatusEnum.DESCBROKEN, 
cubeManager.getCube("a_whole_new_cube").getStatus());
+        cubeManager.updateCubeStatus(createdCube, RealizationStatusEnum.READY);
+
+        assertEquals(RealizationStatusEnum.READY, 
cubeManager.getCube("a_whole_new_cube").getStatus());
+    }
+
+    @Test
+    public void testCachedAndSharedFlag() {
+        CubeInstance cube = 
cubeManager.getCube("test_kylin_cube_with_slr_empty");
+        assertEquals(true, cube.isCachedAndShared());
+        assertEquals(false, cube.latestCopyForWrite().isCachedAndShared());
+
+        try {
+            new CubeUpdate(cube);
+            fail();
+        } catch (IllegalArgumentException ex) {
+            // update cached object is illegal
+        }
     }
 
     public CubeDescManager getCubeDescManager() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java
new file mode 100644
index 0000000..58f29b2
--- /dev/null
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerConcurrencyTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CubeManagerConcurrencyTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CubeManagerConcurrencyTest.class);
+    
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("kylin.cube.max-building-segments", "10000");
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        System.clearProperty("kylin.cube.max-building-segments");
+    }
+
+    @Test
+    public void test() throws Exception {
+        final KylinConfig config = getTestConfig();
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+        CubeDescManager cubeDescMgr = CubeDescManager.getInstance(config);
+
+        // 4 empty new cubes to start with
+        final String[] cubeNames = { "c1", "c2", "c3", "c4" };
+        final int n = cubeNames.length;
+        final int updatesPerCube = 100;
+        final CubeDesc desc = 
cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc");
+        final List<CubeInstance> cubes = new ArrayList<>();
+        for (String name : cubeNames) {
+            cubes.add(cubeMgr.createCube(name, 
ProjectInstance.DEFAULT_PROJECT_NAME, desc, null));
+        }
+
+        final AtomicInteger runningFlag = new AtomicInteger();
+        final Vector<Exception> exceptions = new Vector<>();
+
+        // 1 thread, keeps reloading cubes
+        Thread reloadThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Random rand = new Random();
+                    while (runningFlag.get() == 0) {
+                        String name = cubeNames[rand.nextInt(n)];
+                        CubeManager.getInstance(config).reloadCube(name);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception ex) {
+                    logger.error("reload thread error", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+        reloadThread.start();
+
+        // 4 threads, keeps updating cubes
+        Thread[] updateThreads = new Thread[n];
+        for (int i = 0; i < n; i++) {
+            // each thread takes care of one cube
+            // for now, the design refuses concurrent updates to one cube
+            final String cubeName = cubeNames[i];
+            updateThreads[i] = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        Random rand = new Random();
+                        for (int i = 0; i < updatesPerCube; i++) {
+                            CubeManager mgr = CubeManager.getInstance(config);
+                            CubeInstance cube = mgr.getCube(cubeName);
+                            mgr.appendSegment(cube, new TSRange((long) i, 
(long) i + 1));
+                            Thread.sleep(rand.nextInt(1));
+                        }
+                    } catch (Exception ex) {
+                        logger.error("update thread ", ex);
+                        exceptions.add(ex);
+                    }
+                }
+            };
+            updateThreads[i].start();
+        }
+
+        // wait things done
+        for (int i = 0; i < n; i++) {
+            updateThreads[i].join();
+        }
+        runningFlag.incrementAndGet();
+        reloadThread.join();
+        
+        // check result and error
+        if (exceptions.isEmpty() == false) {
+            logger.error(exceptions.size() + " exceptions encountered, see 
logs above");
+            fail();
+        }
+        for (int i = 0; i < n; i++) {
+            CubeInstance cube = cubeMgr.getCube(cubeNames[i]);
+            assertEquals(updatesPerCube, cube.getSegments().size());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index d8d48f7..f293472 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -42,7 +42,6 @@ import org.junit.Test;
 import com.google.common.collect.Maps;
 
 /**
- * @author yangli9
  */
 public class CubeManagerTest extends LocalFileMetadataTestCase {
 
@@ -81,13 +80,14 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
 
         CubeDescManager cubeDescMgr = getCubeDescManager();
         CubeDesc desc = 
cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc");
-        CubeInstance createdCube = cubeMgr.createCube("a_whole_new_cube", 
ProjectInstance.DEFAULT_PROJECT_NAME, desc, null);
-        assertTrue(createdCube == cubeMgr.getCube("a_whole_new_cube"));
+        CubeInstance createdCube = cubeMgr.createCube("a_whole_new_cube", 
ProjectInstance.DEFAULT_PROJECT_NAME, desc,
+                null);
+        assertTrue(createdCube.equals(cubeMgr.getCube("a_whole_new_cube")));
 
         
assertTrue(prjMgr.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME).contains(createdCube));
 
         CubeInstance droppedCube = 
CubeManager.getInstance(getTestConfig()).dropCube("a_whole_new_cube", false);
-        assertTrue(createdCube == droppedCube);
+        assertTrue(createdCube.equals(droppedCube));
 
         
assertTrue(!prjMgr.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME).contains(droppedCube));
 
@@ -97,7 +97,7 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
     @Test
     public void testAutoMergeNormal() throws Exception {
         CubeManager mgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        CubeInstance cube = 
mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite();
 
         cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 });
         mgr.updateCube(new CubeUpdate(cube));
@@ -109,15 +109,12 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
 
         // append first
         CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L), 
null, null, null);
-        seg1.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY);
 
         CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L), 
null, null, null);
-        seg2.setStatus(SegmentStatusEnum.READY);
-
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-
-        mgr.updateCube(cubeBuilder);
+        mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY);
 
+        cube = mgr.getCube(cube.getName());
         assertEquals(2, cube.getSegments().size());
 
         SegmentRange mergedSeg = cube.autoMergeCubeSegments();
@@ -126,95 +123,92 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
 
     }
 
-
     @Test
     public void testConcurrentBuildAndMerge() throws Exception {
         CubeManager mgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        CubeInstance cube = 
mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite();
+
         System.setProperty("kylin.cube.max-building-segments", "10");
         // no segment at first
         assertEquals(0, cube.getSegments().size());
 
-        Map m1 =  Maps.newHashMap();
+        Map m1 = Maps.newHashMap();
         m1.put(1, 1000L);
-        Map m2 =  Maps.newHashMap();
+        Map m2 = Maps.newHashMap();
         m2.put(1, 2000L);
-        Map m3 =  Maps.newHashMap();
+        Map m3 = Maps.newHashMap();
         m3.put(1, 3000L);
-        Map m4 =  Maps.newHashMap();
+        Map m4 = Maps.newHashMap();
         m4.put(1, 4000L);
 
         // append first
         CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 
1000L), null, m1);
-        seg1.setStatus(SegmentStatusEnum.READY);
-
+        mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY);
 
         CubeSegment seg2 = mgr.appendSegment(cube, null, new 
SegmentRange(1000L, 2000L), m1, m2);
-        seg2.setStatus(SegmentStatusEnum.READY);
-
+        mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY);
 
         CubeSegment seg3 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 
2000L), true);
-        seg3.setStatus(SegmentStatusEnum.NEW);
-
+        //seg3.setStatus(SegmentStatusEnum.NEW);
 
         CubeSegment seg4 = mgr.appendSegment(cube, null, new 
SegmentRange(2000L, 3000L), m2, m3);
         seg4.setStatus(SegmentStatusEnum.NEW);
         seg4.setLastBuildJobID("test");
         seg4.setStorageLocationIdentifier("test");
-
+        CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
+        update.setToUpdateSegs(seg4);
+        mgr.updateCube(update);
 
         CubeSegment seg5 = mgr.appendSegment(cube, null, new 
SegmentRange(3000L, 4000L), m3, m4);
-        seg5.setStatus(SegmentStatusEnum.READY);
-
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-
-        mgr.updateCube(cubeBuilder);
-
+        mgr.updateCubeSegStatus(seg5, SegmentStatusEnum.READY);
 
         mgr.promoteNewlyBuiltSegments(cube, seg4);
 
+        cube = mgr.getCube(cube.getName());
         assertTrue(cube.getSegments().size() == 5);
 
-        assertTrue(cube.getSegmentById(seg1.getUuid()) != null && 
cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(seg2.getUuid()) != null && 
cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && 
cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW);
-        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && 
cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(seg5.getUuid()) != null && 
cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg1.getUuid()) != null
+                && cube.getSegmentById(seg1.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg2.getUuid()) != null
+                && cube.getSegmentById(seg2.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null
+                && cube.getSegmentById(seg3.getUuid()).getStatus() == 
SegmentStatusEnum.NEW);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null
+                && cube.getSegmentById(seg4.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg5.getUuid()) != null
+                && cube.getSegmentById(seg5.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
 
     }
 
-
     @Test
     public void testConcurrentMergeAndMerge() throws Exception {
         System.setProperty("kylin.cube.max-building-segments", "10");
         CubeManager mgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        CubeInstance cube = 
mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite();
 
         // no segment at first
         assertEquals(0, cube.getSegments().size());
-        Map m1 =  Maps.newHashMap();
+        Map m1 = Maps.newHashMap();
         m1.put(1, 1000L);
-        Map m2 =  Maps.newHashMap();
+        Map m2 = Maps.newHashMap();
         m2.put(1, 2000L);
-        Map m3 =  Maps.newHashMap();
+        Map m3 = Maps.newHashMap();
         m3.put(1, 3000L);
-        Map m4 =  Maps.newHashMap();
+        Map m4 = Maps.newHashMap();
         m4.put(1, 4000L);
 
         // append first
         CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 
1000L), null, m1);
-        seg1.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY);
 
         CubeSegment seg2 = mgr.appendSegment(cube, null, new 
SegmentRange(1000L, 2000L), m1, m2);
-        seg2.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY);
 
         CubeSegment seg3 = mgr.appendSegment(cube, null, new 
SegmentRange(2000L, 3000L), m2, m3);
-        seg3.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY);
 
         CubeSegment seg4 = mgr.appendSegment(cube, null, new 
SegmentRange(3000L, 4000L), m3, m4);
-        seg4.setStatus(SegmentStatusEnum.READY);
-
-
+        mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY);
 
         CubeSegment merge1 = mgr.mergeSegments(cube, null, new 
SegmentRange(0L, 2000L), true);
         merge1.setStatus(SegmentStatusEnum.NEW);
@@ -226,20 +220,25 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         merge2.setLastBuildJobID("test");
         merge2.setStorageLocationIdentifier("test");
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        mgr.updateCube(cubeBuilder);
-
+        CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
+        update.setToUpdateSegs(merge1, merge2);
+        mgr.updateCube(update);
 
         mgr.promoteNewlyBuiltSegments(cube, merge1);
 
+        cube = mgr.getCube(cube.getName());
         assertTrue(cube.getSegments().size() == 4);
 
         assertTrue(cube.getSegmentById(seg1.getUuid()) == null);
         assertTrue(cube.getSegmentById(seg2.getUuid()) == null);
-        assertTrue(cube.getSegmentById(merge1.getUuid()) != null && 
cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && 
cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && 
cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
-        assertTrue(cube.getSegmentById(merge2.getUuid()) != null && 
cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+        assertTrue(cube.getSegmentById(merge1.getUuid()) != null
+                && cube.getSegmentById(merge1.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null
+                && cube.getSegmentById(seg3.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null
+                && cube.getSegmentById(seg4.getUuid()).getStatus() == 
SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(merge2.getUuid()) != null
+                && cube.getSegmentById(merge2.getUuid()).getStatus() == 
SegmentStatusEnum.NEW);
 
     }
 
@@ -249,14 +248,15 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         final NavigableSet<String> cubePath = 
store.listResources(ResourceStore.CUBE_RESOURCE_ROOT);
         assertTrue(cubePath.size() > 1);
 
-        final List<CubeInstance> cubes = 
store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, 
CubeManager.CUBE_SERIALIZER);
+        final List<CubeInstance> cubes = 
store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
+                CubeManager.CUBE_SERIALIZER);
         assertEquals(cubePath.size(), cubes.size());
     }
 
     @Test
     public void testAutoMergeWithGap() throws Exception {
         CubeManager mgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        CubeInstance cube = 
mgr.getCube("test_kylin_cube_with_slr_empty").latestCopyForWrite();
 
         cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 });
         mgr.updateCube(new CubeUpdate(cube));
@@ -268,11 +268,12 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
 
         // append first
         CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
-        seg1.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY);
 
         CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L));
-        seg3.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY);
 
+        cube = mgr.getCube(cube.getName());
         assertEquals(2, cube.getSegments().size());
 
         SegmentRange mergedSeg = cube.autoMergeCubeSegments();
@@ -282,8 +283,9 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         // append a new seg which will be merged
 
         CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L));
-        seg4.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY);
 
+        cube = mgr.getCube(cube.getName());
         assertEquals(3, cube.getSegments().size());
 
         mergedSeg = cube.autoMergeCubeSegments();
@@ -294,8 +296,9 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         // fill the gap
 
         CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
-        seg2.setStatus(SegmentStatusEnum.READY);
+        mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY);
 
+        cube = mgr.getCube(cube.getName());
         assertEquals(4, cube.getSegments().size());
 
         mergedSeg = cube.autoMergeCubeSegments();

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 64c6d68..c4dcb59 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -56,7 +56,10 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
         CubeSegment seg = mgr.appendSegment(cube);
         assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getTSRange());
         assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getSegRange());
-        assertEquals(1, cube.getSegments().size());
+        
+        assertEquals(0, cube.getSegments().size()); // older cube not changed
+        cube = mgr.getCube(cube.getName());
+        assertEquals(1, cube.getSegments().size()); // the updated cube
 
         // second append, throw IllegalStateException because the first 
segment is not built
         try {
@@ -84,7 +87,10 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
         CubeSegment seg2 = mgr.appendSegment(cube);
         assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getTSRange());
         assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getSegRange());
-        assertEquals(2, cube.getSegments().size());
+        
+        assertEquals(1, cube.getSegments().size()); // older cube not changed
+        cube = mgr.getCube(cube.getName());
+        assertEquals(2, cube.getSegments().size()); // the updated cube
 
         // non-partitioned cannot merge, throw exception
         try {
@@ -105,32 +111,35 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // append first
         CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
-        seg1.setStatus(SegmentStatusEnum.READY);
+        cube = readySegment(cube, seg1);
 
         // append second
         CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
+        cube = readySegment(cube, seg2);
 
         assertEquals(2, cube.getSegments().size());
         assertEquals(new TSRange(1000L, 2000L), seg2.getTSRange());
         assertEquals(new TSRange(1000L, 2000L), seg2.getSegRange());
-        assertEquals(SegmentStatusEnum.NEW, seg2.getStatus());
-        seg2.setStatus(SegmentStatusEnum.READY);
+        assertEquals(SegmentStatusEnum.NEW, seg2.getStatus()); // older 
version of seg2
+        assertEquals(SegmentStatusEnum.READY, 
cube.getSegments().get(1).getStatus()); // newer version of seg2
 
         // merge first and second
         CubeSegment merge = mgr.mergeSegments(cube, new TSRange(0L, 2000L), 
null, true);
+        assertEquals(2, cube.getSegments().size()); // older version of cube
 
+        cube = mgr.getCube(cube.getName()); // get the newer version of cube
         assertEquals(3, cube.getSegments().size());
         assertEquals(new TSRange(0L, 2000L), merge.getTSRange());
         assertEquals(new TSRange(0L, 2000L), merge.getSegRange());
         assertEquals(SegmentStatusEnum.NEW, merge.getStatus());
 
         // segments are strictly ordered
-        assertEquals(seg1, cube.getSegments().get(0));
-        assertEquals(merge, cube.getSegments().get(1));
-        assertEquals(seg2, cube.getSegments().get(2));
+        assertEquals(seg1.getUuid(), cube.getSegments().get(0).getUuid());
+        assertEquals(merge.getUuid(), cube.getSegments().get(1).getUuid());
+        assertEquals(seg2.getUuid(), cube.getSegments().get(2).getUuid());
 
         // drop the merge
-        cube.getSegments().remove(merge);
+        cube = mgr.updateCubeDropSegments(cube, merge);
 
         // try merge at start/end at middle of segments
         try {
@@ -141,6 +150,7 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
         }
 
         CubeSegment merge2 = mgr.mergeSegments(cube, new TSRange(0L, 2500L), 
null, true);
+        cube = mgr.getCube(cube.getName()); // get the newer version of cube
         assertEquals(3, cube.getSegments().size());
         assertEquals(new TSRange(0L, 2000L), merge2.getTSRange());
         assertEquals(new TSRange(0L, 2000L), merge2.getSegRange());
@@ -157,12 +167,12 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // append the first
         CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
-        seg1.setStatus(SegmentStatusEnum.READY);
+        cube = readySegment(cube, seg1);
         assertEquals(1, cube.getSegments().size());
 
         // append the third
         CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 3000L));
-        seg3.setStatus(SegmentStatusEnum.READY);
+        cube = readySegment(cube, seg3);
         assertEquals(2, cube.getSegments().size());
 
         // reject overlap
@@ -175,9 +185,13 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // append the second
         CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
-        seg2.setStatus(SegmentStatusEnum.READY);
+        cube = readySegment(cube, seg2);
         assertEquals(3, cube.getSegments().size());
     }
+    
+    private CubeInstance readySegment(CubeInstance cube, CubeSegment seg) 
throws IOException {
+        return mgr().updateCubeSegStatus(seg, SegmentStatusEnum.READY);
+    }
 
     private CubeManager mgr() {
         return CubeManager.getInstance(getTestConfig());

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java
index c99b683..d5b421f 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/project/ProjectManagerTest.java
@@ -79,7 +79,7 @@ public class ProjectManagerTest extends 
LocalFileMetadataTestCase {
 
         CubeDesc desc = 
cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc");
         CubeInstance createdCube = cubeMgr.createCube("cube_in_alien_project", 
"alien", desc, null);
-        assertTrue(createdCube == cubeMgr.getCube("cube_in_alien_project"));
+        
assertTrue(createdCube.equals(cubeMgr.getCube("cube_in_alien_project")));
         ProjectManager proMgr = ProjectManager.getInstance(getTestConfig());
         Set<IRealization> realizations = proMgr.listAllRealizations("alien");
         assertTrue(realizations.contains(createdCube));
@@ -102,7 +102,7 @@ public class ProjectManagerTest extends 
LocalFileMetadataTestCase {
 
         CubeInstance droppedCube = cubeMgr.dropCube("cube_in_alien_project", 
true);
 
-        assertTrue(createdCube == droppedCube);
+        assertTrue(createdCube.equals(droppedCube));
         assertNull(cubeMgr.getCube("cube_in_alien_project"));
         assertTrue(prjMgr.listAllProjects().size() == originalProjectCount + 
1);
         assertTrue(cubeMgr.listAllCubes().size() == originalCubeCount);
@@ -126,7 +126,7 @@ public class ProjectManagerTest extends 
LocalFileMetadataTestCase {
 
         CubeDesc desc = 
cubeDescMgr.getCubeDesc("test_kylin_cube_with_slr_desc");
         CubeInstance createdCube = cubeMgr.createCube("new_cube_in_default", 
ProjectInstance.DEFAULT_PROJECT_NAME, desc, null);
-        assertTrue(createdCube == cubeMgr.getCube("new_cube_in_default"));
+        assertTrue(createdCube.equals(cubeMgr.getCube("new_cube_in_default")));
 
         //System.out.println(JsonUtil.writeValueAsIndentString(createdCube));
 
@@ -135,7 +135,7 @@ public class ProjectManagerTest extends 
LocalFileMetadataTestCase {
 
         CubeInstance droppedCube = cubeMgr.dropCube("new_cube_in_default", 
true);
 
-        assertTrue(createdCube == droppedCube);
+        assertTrue(createdCube.equals(droppedCube));
         assertNull(cubeMgr.getCube("new_cube_in_default"));
         assertTrue(prjMgr.listAllProjects().size() == originalProjectCount);
         assertTrue(cubeMgr.listAllCubes().size() == originalCubeCount);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
index ccc0a4a..c7244fe 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.metadata.cachesync;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 
@@ -69,10 +73,37 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
         return (Serializer<DataModelDesc>) serializer;
     }
 
-    public void setCheckOnWrite(boolean check) {
+    public void setCheckCopyOnWrite(boolean check) {
         this.checkCopyOnWrite = check;
     }
 
+    // Make copy of an entity such that update can apply on the copy.
+    // Note cached and shared object MUST NOT be updated directly.
+    public T copyForWrite(T entity) {
+        if (entity.isCachedAndShared() == false)
+            return entity;
+
+        T copy;
+        try {
+            byte[] bytes;
+            try (ByteArrayOutputStream buf = new ByteArrayOutputStream();
+                    DataOutputStream dout = new DataOutputStream(buf)) {
+                serializer.serialize(entity, dout);
+                bytes = buf.toByteArray();
+            }
+
+            try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(bytes))) {
+                copy = serializer.deserialize(in);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        copy.setCachedAndShared(false);
+        initEntityAfterReload(copy, entity.resourceName());
+        return copy;
+    }
+
     private String resourcePath(String resourceName) {
         return resRootPath + "/" + resourceName + resPathSuffix;
     }
@@ -80,12 +111,11 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
     private String resourceName(String resourcePath) {
         Preconditions.checkArgument(resourcePath.startsWith(resRootPath));
         Preconditions.checkArgument(resourcePath.endsWith(resPathSuffix));
-        return resourcePath.substring(resRootPath.length() + 1,
-                resourcePath.length() - resPathSuffix.length());
+        return resourcePath.substring(resRootPath.length() + 1, 
resourcePath.length() - resPathSuffix.length());
     }
 
     public void reloadAll() throws IOException {
-        logger.debug("Reloading " + entityType.getName() + " from " + 
store.getReadableResourcePath(resRootPath));
+        logger.debug("Reloading " + entityType.getSimpleName() + " from " + 
store.getReadableResourcePath(resRootPath));
 
         cache.clear();
 
@@ -94,8 +124,8 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
             reloadQuietlyAt(path);
         }
 
-        logger.debug(
-                "Loaded " + cache.size() + " " + entityType.getName() + "(s) 
out of " + paths.size() + " resource");
+        logger.debug("Loaded " + cache.size() + " " + 
entityType.getSimpleName() + "(s) out of " + paths.size()
+                + " resource");
     }
 
     public T reload(String resourceName) {
@@ -110,7 +140,7 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
         try {
             return reloadAt(path);
         } catch (Exception ex) {
-            logger.error("Error loading " + entityType.getName() + " at " + 
path, ex);
+            logger.error("Error loading " + entityType.getSimpleName() + " at 
" + path, ex);
             return null;
         }
     }
@@ -119,21 +149,23 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
         try {
             T entity = store.getResource(path, entityType, serializer);
             if (entity == null) {
-                logger.warn("No " + entityType.getName() + " found at " + path 
+ ", returning null");
+                logger.warn("No " + entityType.getSimpleName() + " found at " 
+ path + ", returning null");
                 cache.removeLocal(resourceName(path));
                 return null;
             }
 
+            // mark cached object
+            entity.setCachedAndShared(true);
             entity = initEntityAfterReload(entity, resourceName(path));
 
             if (path.equals(resourcePath(entity.resourceName())) == false)
                 throw new IllegalStateException("The entity " + entity + " 
read from " + path
                         + " will save to a different path " + 
resourcePath(entity.resourceName()));
-
+            
             cache.putLocal(entity.resourceName(), entity);
             return entity;
         } catch (Exception e) {
-            throw new IllegalStateException("Error loading " + 
entityType.getName() + " at " + path, e);
+            throw new IllegalStateException("Error loading " + 
entityType.getSimpleName() + " at " + path, e);
         }
     }
 
@@ -148,18 +180,23 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
         Preconditions.checkArgument(resName != null && resName.length() > 0);
 
         if (checkCopyOnWrite) {
-            if (cache.get(resName) == entity) {
+            if (entity.isCachedAndShared() || cache.get(resName) == entity) {
                 throw new IllegalStateException("Copy-on-write violation! The 
updating entity " + entity
-                        + " is a shared object in " + entityType.getName() + " 
cache, which should not be.");
+                        + " is a shared object in " + 
entityType.getSimpleName() + " cache, which should not be.");
             }
         }
 
         String path = resourcePath(resName);
-        logger.debug("Saving {} at {}", entityType.getName(), path);
+        logger.debug("Saving {} at {}", entityType.getSimpleName(), path);
 
         store.putResource(path, entity, serializer);
+        
+        // just to trigger the event broadcast, the entity won't stay in cache
         cache.put(resName, entity);
-        return entity;
+
+        // keep the pass-in entity out of cache, the caller may use it for 
further update
+        // return a reloaded new object
+        return reload(resName);
     }
 
     public void delete(T entity) throws IOException {
@@ -170,7 +207,7 @@ abstract public class CachedCrudAssist<T extends 
RootPersistentEntity> {
         Preconditions.checkArgument(resName != null);
 
         String path = resourcePath(resName);
-        logger.debug("Deleting {} at {}", entityType.getName(), path);
+        logger.debug("Deleting {} at {}", entityType.getSimpleName(), path);
 
         store.deleteResource(path);
         cache.remove(resName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
index b6dbd5d..cf1b94a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.common;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
@@ -41,12 +42,16 @@ public class CuboidShardUtil {
                 filtered.put(entry.getKey(), entry.getValue());
             }
         }
+        
+        // work on copy instead of cached objects
+        CubeInstance cubeCopy = segment.getCubeInstance().latestCopyForWrite();
+        CubeSegment segCopy = cubeCopy.getSegmentById(segment.getUuid());
 
-        segment.setCuboidShardNums(filtered);
-        segment.setTotalShards(totalShards);
+        segCopy.setCuboidShardNums(filtered);
+        segCopy.setTotalShards(totalShards);
 
-        CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());
-        cubeBuilder.setToUpdateSegs(segment);
-        cubeManager.updateCube(cubeBuilder);
+        CubeUpdate update = new CubeUpdate(cubeCopy);
+        update.setToUpdateSegs(segCopy);
+        cubeManager.updateCube(update);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 9c805a8..0f7281f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -107,8 +107,8 @@ public class StatisticsDecisionUtil {
             return;
         }
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setCuboids(recommendCuboidsWithStats);
-        CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder);
+        CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
+        update.setCuboids(recommendCuboidsWithStats);
+        CubeManager.getInstance(cube.getConfig()).updateCube(update);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 58b2c02..2a184b0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -63,12 +63,16 @@ public class MergeDictionaryStep extends AbstractExecutable 
{
         try {
             checkLookupSnapshotsMustIncremental(mergingSegments);
 
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToUpdateSegs(newSegment);
-            mgr.updateCube(cubeBuilder);
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cube.latestCopyForWrite();
+            CubeSegment newSegCopy = 
cubeCopy.getSegmentById(newSegment.getUuid());
+            
+            makeDictForNewSegment(conf, cubeCopy, newSegCopy, mergingSegments);
+            makeSnapshotForNewSegment(cubeCopy, newSegCopy, mergingSegments);
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(newSegCopy);
+            mgr.updateCube(update);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge dictionary or lookup snapshots", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
deleted file mode 100644
index 5ddb024..0000000
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.SegmentRange.TSRange;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.IReadableTable.TableSignature;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("rawtypes")
-public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
-    
-    private static final Logger logger = 
LoggerFactory.getLogger(MergeCuboidMapperTest.class);
-
-    MapDriver<Text, Text, Text, Text> mapDriver;
-    CubeManager cubeManager;
-    CubeInstance cube;
-    DictionaryManager dictionaryManager;
-
-    TblColRef lfn;
-    TblColRef lsi;
-    TblColRef ssc;
-
-    private DictionaryInfo makeSharedDict() throws IOException {
-        TableSignature signature = new TableSignature();
-        signature.setSize(100);
-        signature.setLastModifiedTime(System.currentTimeMillis());
-        signature.setPath("fake_common_dict");
-
-        DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", 
signature);
-
-        List<String> values = new ArrayList<>();
-        values.add("eee");
-        values.add("fff");
-        Dictionary<String> dict = 
DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()),
 new IterableDictionaryValueEnumerator(values));
-        dictionaryManager.trySaveNewDict(dict, newDictInfo);
-        dict.dump(System.out);
-
-        return newDictInfo;
-    }
-
-    @Before
-    public void setUp() throws Exception {
-
-        createTestMetadata();
-
-        logger.info("The metadataUrl is : " + getTestConfig());
-        getTestConfig().clearManagers();
-
-        // hack for distributed cache
-        // 
CubeManager.removeInstance(KylinConfig.createInstanceFromUri("../job/meta"));//to
-        // make sure the following mapper could get latest CubeManger
-        FileUtils.deleteDirectory(new File("../job/meta"));
-
-        MergeCuboidMapper mapper = new MergeCuboidMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-
-        cubeManager = CubeManager.getInstance(getTestConfig());
-        cube = 
cubeManager.getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
-        dictionaryManager = DictionaryManager.getInstance(getTestConfig());
-        lfn = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", 
"LSTG_FORMAT_NAME");
-        lsi = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", 
"CAL_DT");
-        ssc = 
cube.getDescriptor().findColumnRef("DEFAULT.TEST_CATEGORY_GROUPINGS", 
"META_CATEG_NAME");
-
-        DictionaryInfo sharedDict = makeSharedDict();
-
-        boolean isFirstSegment = true;
-        for (CubeSegment segment : cube.getSegments()) {
-
-            TableSignature signature = new TableSignature();
-            signature.setSize(100);
-            signature.setLastModifiedTime(System.currentTimeMillis());
-            signature.setPath("fake_dict_for" + lfn.getName() + 
segment.getName());
-
-            DictionaryInfo newDictInfo = new 
DictionaryInfo(lfn.getColumnDesc(), "string", signature);
-
-            List<String> values = new ArrayList<>();
-            values.add("aaa");
-            if (isFirstSegment)
-                values.add("ccc");
-            else
-                values.add("bbb");
-            Dictionary<String> dict = 
DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()),
 new IterableDictionaryValueEnumerator(values));
-            dictionaryManager.trySaveNewDict(dict, newDictInfo);
-            dict.dump(System.out);
-
-            segment.putDictResPath(lfn, newDictInfo.getResourcePath());
-            segment.putDictResPath(lsi, sharedDict.getResourcePath());
-            segment.putDictResPath(ssc, sharedDict.getResourcePath());
-
-            // cubeManager.saveResource(segment.getCubeInstance());
-            // cubeManager.afterCubeUpdated(segment.getCubeInstance());
-
-            isFirstSegment = false;
-        }
-
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
-        cube = cubeManager.updateCube(cubeBuilder);
-
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-        FileUtils.deleteDirectory(new File("../job/meta"));
-    }
-
-    @Test
-    public void test() throws IOException, ParseException {
-
-        //        String cubeName = 
"test_kylin_cube_without_slr_left_join_ready_2_segments";
-
-        CubeSegment newSeg = cubeManager.mergeSegments(cube, new TSRange(0L, 
Long.MAX_VALUE), null, false);
-        //        String segmentName = newSeg.getName();
-
-        final Dictionary<String> dictionary = 
cubeManager.getDictionary(newSeg, lfn);
-        assertTrue(dictionary == null);
-        //        ((TrieDictionary) dictionary).dump(System.out);
-
-        // hack for distributed cache
-        //        File metaDir = new File("../job/meta");
-        //        FileUtils.copyDirectory(new 
File(getTestConfig().getMetadataUrl()), metaDir);
-        //
-        //        
mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        //        
mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, 
segmentName);
-        //        // 
mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
-        //        // "../job/meta");
-        //
-        //        byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 
};
-        //        byte[] value = new byte[] { 1, 2, 3 };
-        //        byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 
2 };
-        //        byte[] newvalue = new byte[] { 1, 2, 3 };
-        //
-        //        mapDriver.withInput(new Text(key), new Text(value));
-        //        mapDriver.withOutput(new Text(newkey), new Text(newvalue));
-        //        mapDriver.setMapInputPath(new 
Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
-        //
-        //        mapDriver.runTest();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 872deed..459d3aa 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -238,11 +238,17 @@ public class SparkCubing extends AbstractApplication {
             })));
         }
         final long end = System.currentTimeMillis();
-        CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
+        
+        // work on copy instead of cached objects
+        CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
+        CubeSegment segCopy = cubeCopy.getSegmentById(seg.getUuid());
+
+        CubingUtils.writeDictionary(segCopy, dictionaryMap, start, end);
+        
         try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeBuilder.setToUpdateSegs(seg);
-            cubeManager.updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(segCopy);
+            cubeManager.updateCube(update);
         } catch (IOException e) {
             throw new RuntimeException("Failed to deal with the request: " + 
e.getLocalizedMessage());
         }
@@ -501,8 +507,8 @@ public class SparkCubing extends AbstractApplication {
 
     private void bulkLoadHFile(String cubeName, String segmentId, String 
hfileLocation) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeInstance cubeCopy = 
CubeManager.getInstance(kylinConfig).getCube(cubeName).latestCopyForWrite();
+        final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId);
         final Configuration hbaseConf = 
HBaseConnection.getCurrentHBaseConfiguration();
 
         FsShell shell = new FsShell(hbaseConf);
@@ -515,18 +521,17 @@ public class SparkCubing extends AbstractApplication {
 
         String[] newArgs = new String[2];
         newArgs[0] = hfileLocation;
-        newArgs[1] = cubeSegment.getStorageLocationIdentifier();
+        newArgs[1] = segCopy.getStorageLocationIdentifier();
 
         int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), 
newArgs);
         System.out.println("incremental load result:" + ret);
 
-        cubeSegment.setStatus(SegmentStatusEnum.READY);
         try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeInstance.setStatus(RealizationStatusEnum.READY);
-            cubeSegment.setStatus(SegmentStatusEnum.READY);
-            cubeBuilder.setToUpdateSegs(cubeSegment);
-            CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            cubeCopy.setStatus(RealizationStatusEnum.READY);
+            segCopy.setStatus(SegmentStatusEnum.READY);
+            update.setToUpdateSegs(segCopy);
+            CubeManager.getInstance(kylinConfig).updateCube(update);
         } catch (IOException e) {
             throw new RuntimeException("Failed to deal with the request: " + 
e.getLocalizedMessage());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 42fc124..18a07cf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -44,7 +44,6 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -328,10 +327,7 @@ public class BuildCubeWithEngine {
 
     private void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
+        cubeManager.updateCubeDropSegments(cube, cube.getSegments());
     }
 
     private Boolean mergeSegment(String cubeName, long startDate, long 
endDate) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index d338332..bf52b97 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -47,7 +47,6 @@ import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -160,10 +159,7 @@ public class BuildCubeWithStream {
 
     protected void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
+        cubeManager.updateCubeDropSegments(cube, cube.getSegments());
     }
 
     public void build() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f04f825..a02715e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -176,9 +176,8 @@ public class CubeService extends BasicService implements 
InitializingBean {
         String owner = 
SecurityContextHolder.getContext().getAuthentication().getName();
         cube.setOwner(owner);
 
-        CubeUpdate cubeBuilder = new 
CubeUpdate(cube).setOwner(owner).setCost(cost);
-
-        return getCubeManager().updateCube(cubeBuilder);
+        CubeUpdate update = new 
CubeUpdate(cube.latestCopyForWrite()).setOwner(owner).setCost(cost);
+        return getCubeManager().updateCube(update);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
@@ -359,16 +358,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
             throw new 
BadRequestException(String.format(msg.getDISABLE_NOT_READY_CUBE(), cubeName, 
ostatus));
         }
 
-        cube.setStatus(RealizationStatusEnum.DISABLED);
-
-        try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setStatus(RealizationStatusEnum.DISABLED);
-            return getCubeManager().updateCube(cubeBuilder);
-        } catch (IOException e) {
-            cube.setStatus(ostatus);
-            throw e;
-        }
+        return getCubeManager().updateCubeStatus(cube, 
RealizationStatusEnum.DISABLED);
     }
 
     public void checkEnableCubeCondition(CubeInstance cube) {
@@ -399,15 +389,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
      * @throws IOException
      */
     public CubeInstance enableCube(CubeInstance cube) throws IOException {
-        RealizationStatusEnum ostatus = cube.getStatus();
-        try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setStatus(RealizationStatusEnum.READY);
-            return getCubeManager().updateCube(cubeBuilder);
-        } catch (IOException e) {
-            cube.setStatus(ostatus);
-            throw e;
-        }
+        return getCubeManager().updateCubeStatus(cube, 
RealizationStatusEnum.READY);
     }
 
     public MetricsResponse calculateMetrics(MetricsRequest request) {
@@ -509,9 +491,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
             throw new 
BadRequestException(String.format(msg.getDELETE_NOT_READY_SEG(), segmentName));
         }
 
-        CubeUpdate update = new CubeUpdate(cube);
-        update.setToRemoveSegs(new CubeSegment[] { toDelete });
-        return CubeManager.getInstance(getConfig()).updateCube(update);
+        return 
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
     }
 
     protected void releaseAllJobs(CubeInstance cube) {
@@ -532,10 +512,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
      */
     private void releaseAllSegments(CubeInstance cube) throws IOException {
         releaseAllJobs(cube);
-
-        CubeUpdate update = new CubeUpdate(cube);
-        update.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
-        CubeManager.getInstance(getConfig()).updateCube(update);
+        CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, 
cube.getSegments());
     }
 
     public void updateOnNewSegmentReady(String cubeName) {
@@ -579,10 +556,8 @@ public class CubeService extends BasicService implements 
InitializingBean {
             }
 
             if (toRemoveSegs.size() > 0) {
-                CubeUpdate cubeBuilder = new CubeUpdate(cube);
-                cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[toRemoveSegs.size()]));
                 try {
-                    this.getCubeManager().updateCube(cubeBuilder);
+                    getCubeManager().updateCubeDropSegments(cube, 
toRemoveSegs);
                 } catch (IOException e) {
                     logger.error("Failed to remove old segment from cube " + 
cubeName, e);
                 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5648c08..1dfa1de 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -37,7 +37,6 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -256,10 +255,8 @@ public class JobService extends BasicService implements 
InitializingBean {
                 logger.error("Job submission might failed for NEW segment {}, 
will clean the NEW segment from cube",
                         newSeg.getName());
                 try {
-                    // Remove this segments
-                    CubeUpdate cubeBuilder = new CubeUpdate(cube);
-                    cubeBuilder.setToRemoveSegs(newSeg);
-                    getCubeManager().updateCube(cubeBuilder);
+                    // Remove this segment
+                    getCubeManager().updateCubeDropSegments(cube, newSeg);
                 } catch (Exception ee) {
                     // swallow the exception
                     logger.error("Clean New segment failed, ignoring it", e);
@@ -347,10 +344,8 @@ public class JobService extends BasicService implements 
InitializingBean {
         for (String segmentId : StringUtils.split(segmentIds)) {
             final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
             if (segment != null && (segment.getStatus() == 
SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
-                // Remove this segments
-                CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-                cubeBuilder.setToRemoveSegs(segment);
-                getCubeManager().updateCube(cubeBuilder);
+                // Remove this segment
+                getCubeManager().updateCubeDropSegments(cubeInstance, segment);
             }
         }
         getExecutableManager().discardJob(job.getId());

Reply via email to