http://git-wip-us.apache.org/repos/asf/kylin/blob/03e6b8c5/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index e3a7586..37bf8ff 100644
---
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -50,12 +50,12 @@ public class MergeOffsetStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws
ExecuteException {
final CubeManager cubeManager =
CubeManager.getInstance(context.getConfig());
- final CubeInstance cube =
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeInstance cubeCopy =
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
final String segmentId =
CubingExecutableUtil.getSegmentId(this.getParams());
- final CubeSegment segment = cube.getSegmentById(segmentId);
+ final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId);
- Preconditions.checkNotNull(segment, "Cube segment '" + segmentId + "'
not found.");
- Segments<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+ Preconditions.checkNotNull(segCopy, "Cube segment '" + segmentId + "'
not found.");
+ Segments<CubeSegment> mergingSegs =
cubeCopy.getMergingSegments(segCopy);
Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment
not exist.");
@@ -63,16 +63,16 @@ public class MergeOffsetStep extends AbstractExecutable {
final CubeSegment first = mergingSegs.get(0);
final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
- segment.setSegRange(new SegmentRange(first.getSegRange().start,
last.getSegRange().end));
-
segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
-
segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+ segCopy.setSegRange(new SegmentRange(first.getSegRange().start,
last.getSegRange().end));
+
segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+
segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
- segment.setTSRange(new TSRange(mergingSegs.getTSStart(),
mergingSegs.getTSEnd()));
+ segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(),
mergingSegs.getTSEnd()));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+ update.setToUpdateSegs(segCopy);
try {
- cubeManager.updateCube(cubeBuilder);
+ cubeManager.updateCube(update);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to update cube segment offset", e);