http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index e3a7586..37bf8ff 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -50,12 +50,12 @@ public class MergeOffsetStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
         final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeInstance cubeCopy = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
         final String segmentId = 
CubingExecutableUtil.getSegmentId(this.getParams());
-        final CubeSegment segment = cube.getSegmentById(segmentId);
+        final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId);
 
-        Preconditions.checkNotNull(segment, "Cube segment '" + segmentId + "' 
not found.");
-        Segments<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+        Preconditions.checkNotNull(segCopy, "Cube segment '" + segmentId + "' 
not found.");
+        Segments<CubeSegment> mergingSegs = 
cubeCopy.getMergingSegments(segCopy);
 
         Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment 
not exist.");
 
@@ -63,16 +63,16 @@ public class MergeOffsetStep extends AbstractExecutable {
         final CubeSegment first = mergingSegs.get(0);
         final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
 
-        segment.setSegRange(new SegmentRange(first.getSegRange().start, 
last.getSegRange().end));
-        
segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
-        
segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+        segCopy.setSegRange(new SegmentRange(first.getSegRange().start, 
last.getSegRange().end));
+        
segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+        
segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
 
-        segment.setTSRange(new TSRange(mergingSegs.getTSStart(), 
mergingSegs.getTSEnd()));
+        segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), 
mergingSegs.getTSEnd()));
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(segment);
+        CubeUpdate update = new CubeUpdate(cubeCopy);
+        update.setToUpdateSegs(segCopy);
         try {
-            cubeManager.updateCube(cubeBuilder);
+            cubeManager.updateCube(update);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube segment offset", e);

Reply via email to