This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e8ad658fcd43c650a7545c0d6946fdcd114460dc
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Sat Sep 21 17:03:45 2019 +0800

    KYLIN-4167 Refactor streaming coordinator Phase1
---
 build/conf/kylin-server-log4j.properties           |  10 +
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../kylin/job/execution/AbstractExecutable.java    |   2 +-
 .../optrule/visitor/TimezoneRewriteVisitor.java    |   5 +-
 .../rest/service/StreamingCoordinatorService.java  |  27 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |   5 +-
 .../kylin/stream/coordinator/Coordinator.java      |   3 +-
 .../client/CoordinatorClientFactory.java           |  17 +-
 .../coordinator/coordinate/BuildJobSubmitter.java  | 459 +++++++++++++++
 .../coordinate/ReceiverClusterManager.java         | 651 +++++++++++++++++++++
 .../coordinate/SegmentJobBuildInfo.java            |  73 +++
 .../coordinate/StreamingCoordinator.java           | 643 ++++++++++++++++++++
 .../annotations/NonSideEffect.java}                |  43 +-
 .../annotations/NotAtomicAndNotIdempotent.java     |  45 ++
 .../annotations/NotAtomicIdempotent.java           |  42 ++
 .../ClusterDoctor.java}                            |  27 +-
 .../ClusterStateChecker.java}                      |  32 +-
 .../exception/ClusterStateException.java           |   6 +-
 .../coordinator/exception/CoordinateException.java |   3 +
 .../exception/NotLeadCoordinatorException.java     |   2 +
 .../coordinator/exception/StoreException.java      |   2 +
 .../src/main/resources/log4j.properties            |  18 +-
 .../coordinate/BuildJobSubmitterTest.java          | 194 ++++++
 .../coordinator/coordinate/StreamingTestBase.java  | 335 +++++++++++
 .../kylin/stream/server/StreamingServer.java       |  10 +-
 25 files changed, 2567 insertions(+), 91 deletions(-)

diff --git a/build/conf/kylin-server-log4j.properties 
b/build/conf/kylin-server-log4j.properties
index aba7001..0b3cef9 100644
--- a/build/conf/kylin-server-log4j.properties
+++ b/build/conf/kylin-server-log4j.properties
@@ -30,3 +30,13 @@ log4j.rootLogger=INFO,file
 log4j.logger.org.apache.kylin=DEBUG
 log4j.logger.org.springframework=WARN
 log4j.logger.org.springframework.security=INFO
+
+# Realtime OLAP config
+log4j.appender.realtime=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.realtime.layout=org.apache.log4j.PatternLayout
+log4j.appender.realtime.File=${catalina.home}/../logs/kylin_coordinator.log
+log4j.appender.realtime.layout.ConversionPattern=%d{ISO8601} %-5p [%t] 
%c{2}:%L : %m%n
+log4j.appender.realtime.Append=true
+log4j.appender.realtime.MaxFileSize=268435456
+log4j.appender.realtime.MaxBackupIndex=10
+log4j.logger.org.apache.kylin.stream.*=DEBUG, realtime
\ No newline at end of file
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index a0a1d38..6f73024 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2216,6 +2216,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false"));
     }
 
+    public boolean isNewCoordinatorEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", 
"true"));
+    }
+
     public String getLocalStorageImpl() {
         return getOptional("kylin.stream.settled.storage", null);
     }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 51d9bf7..20d62f9 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -265,7 +265,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
     }
 
     @Override
-    public final String getId() {
+    public String getId() {
         return this.id;
     }
 
diff --git 
a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
 
b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
index cb75b34..731ee00 100644
--- 
a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
+++ 
b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
@@ -41,6 +41,7 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 
 public class TimezoneRewriteVisitor extends RexVisitorImpl<RexNode> {
     public static final Logger logger = 
LoggerFactory.getLogger(TimezoneRewriteVisitor.class);
@@ -69,8 +70,8 @@ public class TimezoneRewriteVisitor extends 
RexVisitorImpl<RexNode> {
                         // this will affect code gen
                         int minusHours = 0;
                         String afetrModify = LocalDateTime
-                                .parse(toBeModify, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).minusHours(minusHours)
-                                
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+                                .parse(toBeModify, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", 
Locale.ROOT)).minusHours(minusHours)
+                                
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT));
                         logger.info("{}  ->  {}", toBeModify, afetrModify);
                         RexLiteral newliteral = 
RexLiteral.fromJdbcString(literal.getType(), literal.getTypeName(),
                                 afetrModify);
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java
 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java
index b4f223f..7ac7610 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java
@@ -22,10 +22,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.stream.coordinator.Coordinator;
 import org.apache.kylin.stream.coordinator.StreamMetadataStore;
 import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
+import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
+import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator;
 import org.apache.kylin.stream.core.model.CubeAssignment;
 import org.apache.kylin.stream.core.model.ReplicaSet;
 import org.apache.kylin.stream.core.model.Node;
@@ -42,12 +45,17 @@ public class StreamingCoordinatorService extends 
BasicService {
 
     private StreamMetadataStore streamMetadataStore;
 
-    private Coordinator streamingCoordinator;
+    private CoordinatorClient streamingCoordinator;
 
-    public StreamingCoordinatorService(){
+    public StreamingCoordinatorService() {
         streamMetadataStore = 
StreamMetadataStoreFactory.getStreamMetaDataStore();
-        //TODO coordinator operation should go to the only one lead coordinator
-        streamingCoordinator = Coordinator.getInstance();
+        if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) {
+            logger.info("Use new version coordinator.");
+            streamingCoordinator = StreamingCoordinator.getInstance();
+        } else {
+            logger.info("Use old version coordinator.");
+            streamingCoordinator = Coordinator.getInstance();
+        }
     }
 
     public synchronized Map<Integer, Map<String, List<Partition>>> 
reBalanceRecommend() {
@@ -55,7 +63,7 @@ public class StreamingCoordinatorService extends BasicService 
{
     }
 
     public synchronized void reBalance(Map<Integer, Map<String, 
List<Partition>>> reBalancePlan) {
-         streamingCoordinator.reBalance(reBalancePlan);
+        streamingCoordinator.reBalance(reBalancePlan);
     }
 
     public void assignCube(String cubeName) {
@@ -94,24 +102,23 @@ public class StreamingCoordinatorService extends 
BasicService {
         streamingCoordinator.replicaSetLeaderChange(replicaSetID, newLeader);
     }
 
-    public void createReplicaSet(ReplicaSet rs){
+    public void createReplicaSet(ReplicaSet rs) {
         streamingCoordinator.createReplicaSet(rs);
     }
 
-    public void removeReplicaSet(int rsID){
+    public void removeReplicaSet(int rsID) {
         streamingCoordinator.removeReplicaSet(rsID);
     }
 
-    public void addNodeToReplicaSet(Integer replicaSetID, String nodeID){
+    public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) {
         streamingCoordinator.addNodeToReplicaSet(replicaSetID, nodeID);
     }
 
-    public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID){
+    public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
         streamingCoordinator.removeNodeFromReplicaSet(replicaSetID, nodeID);
     }
 
     public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> 
segment, Node receiver) {
         streamingCoordinator.segmentRemoteStoreComplete(receiver, cubeName, 
segment);
     }
-
 }
diff --git 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index a189845..bc075ce 100644
--- 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -128,8 +128,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
     public Iterator<ITuple> search(DataRequest dataRequest, CubeInstance cube, 
StreamingTupleConverter tupleConverter,
             RecordsSerializer recordsSerializer, ReplicaSet rs, TupleInfo 
tupleInfo) throws Exception {
         List<Node> receivers = Lists.newArrayList(rs.getNodes());
-        Node leader = rs.getLeader();
-        Node queryReceiver = findBestReceiverServeQuery(receivers, leader, 
cube.getName());
+        Node queryReceiver = findBestReceiverServeQuery(receivers, 
cube.getName());
         IOException exception;
         try {
             return doSearch(dataRequest, cube, tupleConverter, 
recordsSerializer, queryReceiver, tupleInfo);
@@ -156,7 +155,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
         throw exception;
     }
 
-    private Node findBestReceiverServeQuery(List<Node> receivers, Node lead, 
String cubeName) {
+    private Node findBestReceiverServeQuery(List<Node> receivers, String 
cubeName) {
         // stick to one receiver according to cube name
         int receiversSize = receivers.size();
         int receiverNo = Math.abs(cubeName.hashCode()) % receiversSize;
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 7037d13..77f1268 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -121,6 +121,7 @@ import javax.annotation.Nullable;
  * request, others process will become standby/candidate, so single point of 
failure will be eliminated.
  * </pre>
  */
+@Deprecated
 public class Coordinator implements CoordinatorClient {
     private static final Logger logger = 
LoggerFactory.getLogger(Coordinator.class);
     private static final int DEFAULT_PORT = 7070;
@@ -1109,7 +1110,7 @@ public class Coordinator implements CoordinatorClient {
             logger.info("submit streaming segment build, cube:{} segment:{}", 
cubeName, segmentName);
             CubeSegment newSeg = getCubeManager().appendSegment(cubeInstance,
                     new TSRange(segmentRange.getFirst(), 
segmentRange.getSecond()));
-            DefaultChainedExecutable executable = new 
StreamingCubingEngine().createStreamingCubingBuilder(newSeg,
+            DefaultChainedExecutable executable = new 
StreamingCubingEngine().createStreamingCubingJob(newSeg,
                     "SYSTEM");
             getExecutableManager().addJob(executable);
             CubingJob cubingJob = (CubingJob) executable;
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java
index ef6fdbe..6c8fa16 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java
@@ -21,8 +21,10 @@ package org.apache.kylin.stream.coordinator.client;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.stream.coordinator.Coordinator;
 import org.apache.kylin.stream.coordinator.StreamMetadataStore;
+import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator;
 import org.apache.kylin.stream.core.model.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,9 +32,18 @@ import org.slf4j.LoggerFactory;
 public class CoordinatorClientFactory {
     private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorClientFactory.class);
 
+    private CoordinatorClientFactory() {
+    }
+
     public static CoordinatorClient 
createCoordinatorClient(StreamMetadataStore streamMetadataStore) {
         if (isCoordinatorCoLocate(streamMetadataStore)) {
-            return Coordinator.getInstance();
+            if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) {
+                logger.info("Use new version coordinator.");
+                return StreamingCoordinator.getInstance();
+            } else {
+                logger.info("Use old version coordinator.");
+                return Coordinator.getInstance();
+            }
         } else {
             return new HttpCoordinatorClient(streamMetadataStore);
         }
@@ -43,12 +54,12 @@ public class CoordinatorClientFactory {
             Node coordinatorNode = streamMetadataStore.getCoordinatorNode();
             if (coordinatorNode == null) {
                 logger.warn("no coordinator node registered");
-                return true;
+                return false;
             }
             InetAddress inetAddress = 
InetAddress.getByName(coordinatorNode.getHost());
             return NetworkInterface.getByInetAddress(inetAddress) != null;
         } catch (Exception e) {
-            logger.error("error when ");
+            logger.error("Error when check network interface.", e);
         }
         return true;
     }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
new file mode 100644
index 0000000..cf3771b
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.StreamingCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.model.SegmentBuildState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * <pre>
+ * The main responsibility of BuildJobSubmitter including:
+ *  1. Try to find candidate segment which ready to submit a build job
+ *  2. Trace the status of candidate segment's build job and promote segment 
if it is has met requirements
+ *
+ * The candidate segments are those segment what can be saw/perceived by 
coordinator,
+ *  candidate segment could be divided into following state/queue:
+ *  1. segment which data are uploaded PARTLY
+ *  2. segment which data are uploaded completely and WAITING to build
+ *  3. segment which in BUILDING state, job's state should be one of 
(NEW/RUNNING/ERROR/DISCARD)
+ *  4. segment which built succeed and wait to deliver to historical part (and 
to be deleted in realtime part)
+ *  5. segment which in historical part(HBase Ready Segment)
+ *
+ * By design, segment should transfer to next queue in sequential 
way(shouldn't jump the queue), do not break this.
+ * </pre>
+ */
+public class BuildJobSubmitter implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildJobSubmitter.class);
+
+    private ConcurrentMap<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> 
segmentBuildJobCheckList = Maps
+            .newConcurrentMap();
+    private Set<String> cubeCheckList = new ConcurrentSkipListSet<>();
+    private StreamingCoordinator coordinator;
+
+    private long checkTimes = 0;
+
+    public BuildJobSubmitter(StreamingCoordinator coordinator) {
+        this.coordinator = coordinator;
+    }
+
+    void restore() {
+        logger.info("Restore job submitter");
+        List<String> cubes = coordinator.getStreamMetadataStore().getCubes();
+        for (String cube : cubes) {
+            List<SegmentBuildState> segmentBuildStates = 
coordinator.getStreamMetadataStore()
+                    .getSegmentBuildStates(cube);
+            Collections.sort(segmentBuildStates);
+            for (SegmentBuildState segmentBuildState : segmentBuildStates) {
+                if (segmentBuildState.isInBuilding()) {
+                    SegmentJobBuildInfo jobBuildInfo = new 
SegmentJobBuildInfo(cube, segmentBuildState.getSegmentName(),
+                            segmentBuildState.getState().getJobId());
+                    this.addToJobTrackList(jobBuildInfo);
+                }
+            }
+        }
+    }
+
+    void addToJobTrackList(SegmentJobBuildInfo segmentBuildJob) {
+        ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = 
segmentBuildJobCheckList.get(segmentBuildJob.cubeName);
+        if (buildInfos == null) {
+            buildInfos = new ConcurrentSkipListSet<>();
+            ConcurrentSkipListSet<SegmentJobBuildInfo> previousValue = 
segmentBuildJobCheckList
+                    .putIfAbsent(segmentBuildJob.cubeName, buildInfos);
+            if (previousValue != null) {
+                buildInfos = previousValue;
+            }
+        }
+        buildInfos.add(segmentBuildJob);
+    }
+
+    void addToCheckList(String cubeName) {
+        cubeCheckList.add(cubeName);
+    }
+
+    void clearCheckList(String cubeName) {
+        cubeCheckList.remove(cubeName);
+        segmentBuildJobCheckList.remove(cubeName);
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (coordinator.isLead()) {
+                doRun();
+            }
+        } catch (Exception e) {
+            logger.error("Unexpected error", e);
+        }
+    }
+
+    void doRun() {
+        checkTimes++;
+        
logger.info("\n----------------------------------------------------------- {}", 
checkTimes);
+        List<SegmentJobBuildInfo> successJobs = traceEarliestSegmentBuildJob();
+
+        for (SegmentJobBuildInfo successJob : successJobs) {
+            ConcurrentSkipListSet<SegmentJobBuildInfo> submittedBuildJobs = 
segmentBuildJobCheckList
+                    .get(successJob.cubeName);
+            submittedBuildJobs.remove(successJob);
+        }
+
+        findSegmentReadyToBuild();
+
+        if (checkTimes % 100 == 1) {
+            logger.info("Force traverse all cubes periodically.");
+            for (StreamingCubeInfo cubeInfo : 
coordinator.getEnableStreamingCubes()) {
+                List<String> segmentList = 
checkSegmentBuidJobFromMetadata(cubeInfo.getCubeName());
+                for (String segmentName : segmentList) {
+                    submitSegmentBuildJob(cubeInfo.getCubeName(), segmentName);
+                }
+            }
+        }
+    }
+
+    /**
+     * <pre>
+     * Trace the state of build job for the earliest(NOT ALL) segment for each 
streaming cube, and
+     *  1. try to promote into Ready HBase Segment if job's state is succeed
+     *  2. try to resume the build job if job's state is error
+     * </pre>
+     * 
+     * @return all succeed building job
+     */
+    @NonSideEffect
+    List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
+        List<SegmentJobBuildInfo> successJobs = Lists.newArrayList();
+        for (ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos : 
segmentBuildJobCheckList.values()) {
+            if (buildInfos.isEmpty()) {
+                continue;
+            }
+
+            // find the earliest segment build job and try to promote
+            SegmentJobBuildInfo segmentBuildJob = buildInfos.first();
+            logger.debug("Check the cube:{} segment:{} build status.", 
segmentBuildJob.cubeName,
+                    segmentBuildJob.segmentName);
+            try {
+                CubingJob cubingJob = (CubingJob) 
coordinator.getExecutableManager().getJob(segmentBuildJob.jobID);
+                if (cubingJob == null) {
+                    logger.error("Cannot find metadata of current job.");
+                    continue;
+                }
+                ExecutableState jobState = cubingJob.getStatus();
+                if (ExecutableState.SUCCEED.equals(jobState)) {
+                    CubeManager cubeManager = coordinator.getCubeManager();
+                    CubeInstance cubeInstance = 
cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite();
+                    CubeSegment cubeSegment = 
cubeInstance.getSegment(segmentBuildJob.segmentName, null);
+                    logger.info("The cube:{} segment:{} is ready to be 
promoted.", segmentBuildJob.cubeName,
+                            segmentBuildJob.segmentName);
+                    
coordinator.getClusterManager().segmentBuildComplete(cubingJob, cubeInstance, 
cubeSegment,
+                            segmentBuildJob);
+                    addToCheckList(cubeInstance.getName());
+                    successJobs.add(segmentBuildJob);
+                } else if (ExecutableState.ERROR.equals(jobState)) {
+                    if (segmentBuildJob.retryCnt < 5) {
+                        logger.info("Job:{} is error, resume the job.", 
segmentBuildJob);
+                        
coordinator.getExecutableManager().resumeJob(segmentBuildJob.jobID);
+                        segmentBuildJob.retryCnt++;
+                    } else {
+                        logger.warn("Job:{} is error, exceed max retry. Kylin 
admin could resume it or discard it"
+                                + "(to let new building job be sumbitted) .", 
segmentBuildJob);
+                    }
+                } else {
+                    logger.debug("Current job state {}", jobState);
+                }
+            } catch (Exception e) {
+                logger.error("Error when check streaming segment job build 
state:" + segmentBuildJob, e);
+            }
+        }
+        return successJobs;
+    }
+
+    @NonSideEffect
+    void findSegmentReadyToBuild() {
+        Iterator<String> iterator = cubeCheckList.iterator();
+        while (iterator.hasNext()) {
+            String cubeName = iterator.next();
+            List<String> segmentList = 
checkSegmentBuidJobFromMetadata(cubeName);
+            boolean allSubmited = true;
+            for (String segmentName : segmentList) {
+                boolean ok = submitSegmentBuildJob(cubeName, segmentName);
+                allSubmited = allSubmited && ok;
+                if (!ok) {
+                    logger.debug("Failed to submit building job.");
+                }
+            }
+            if (allSubmited) {
+                iterator.remove();
+                logger.debug("Removed {} from check list.", cubeName);
+            }
+        }
+    }
+
+    // 
==========================================================================================
+    // 
==========================================================================================
+
+    /**
+     * @return list of segment which could be submitted a segment build job
+     */
+    @NonSideEffect
+    List<String> checkSegmentBuidJobFromMetadata(String cubeName) {
+        List<String> result = Lists.newArrayList();
+        CubeInstance cubeInstance = 
coordinator.getCubeManager().getCube(cubeName);
+        CubeSegment latestHistoryReadySegment = 
cubeInstance.getLatestReadySegment();
+
+        long minSegmentStart = -1;
+        if (latestHistoryReadySegment != null) {
+            minSegmentStart = latestHistoryReadySegment.getTSRange().end.v;
+        }
+        int allowMaxBuildingSegments = 
cubeInstance.getConfig().getMaxBuildingSegments();
+
+        CubeAssignment assignments = 
coordinator.getStreamMetadataStore().getAssignmentsByCube(cubeName);
+        Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs();
+
+        List<SegmentBuildState> segmentStates = 
coordinator.getStreamMetadataStore().getSegmentBuildStates(cubeName);
+        int inBuildingSegments = cubeInstance.getBuildingSegments().size();
+        int leftQuota = allowMaxBuildingSegments - inBuildingSegments;
+
+        // Sort it so we can iterate segments from eariler one to newer one
+        Collections.sort(segmentStates);
+
+        for (int i = 0; i < segmentStates.size(); i++) {
+
+            if (leftQuota <= 0) {
+                logger.info("No left quota to build segments for cube:{}", 
cubeName);
+                break;
+            }
+
+            SegmentBuildState segmentState = segmentStates.get(i);
+            Pair<Long, Long> segmentRange = 
CubeSegment.parseSegmentName(segmentState.getSegmentName());
+
+            // If we have a exist historcial segment, we should not let new 
realtime segment overwrite it, it is so dangrous,
+            // we just delete the entry to ignore the segment which should not 
exist
+            if (segmentRange.getFirst() < minSegmentStart) {
+                logger.warn(
+                        "The cube segment state is not correct because it 
belongs to historcial part, cube:{} segment:{}, clear it.",
+                        cubeName, segmentState.getSegmentName());
+                
coordinator.getStreamMetadataStore().removeSegmentBuildState(cubeName, 
segmentState.getSegmentName());
+                continue;
+            }
+
+            // We already have a building job for current segment
+            if (segmentState.isInBuilding()) {
+                boolean needRebuild = checkSegmentBuildingJob(segmentState, 
cubeName, cubeInstance);
+                if (!needRebuild)
+                    continue;
+            } else if (segmentState.isInWaiting()) {
+                // The data has not been uploaded to remote completely, or job 
is discard
+                // These two case should be submit a building job, just let go 
through it
+            }
+
+            boolean readyToBuild = checkSegmentIsReadyToBuild(segmentStates, 
i, cubeAssignedReplicaSets);
+            if (!readyToBuild) {
+                logger.debug("Segment {} {} is not ready to submit a building 
job.", cubeName, segmentState);
+                // use break instead continue here, because we should transfer 
to next queue in sequential way (no jump the queue)
+                break;
+            } else {
+                result.add(segmentState.getSegmentName());
+                leftQuota--;
+            }
+        }
+        if (logger.isDebugEnabled() && result.isEmpty()) {
+            logger.debug("Candidate {} : {}.", cubeName, String.join(", ", 
result));
+        }
+        return result;
+    }
+
+    /**
+     * Submit a build job for streaming segment
+     *
+     * @return true if submit succeed ; else false
+     */
+    @NotAtomicIdempotent
+    boolean submitSegmentBuildJob(String cubeName, String segmentName) {
+        logger.info("Try submit streaming segment build job, cube:{} 
segment:{}", cubeName, segmentName);
+        CubeInstance cubeInstance = 
coordinator.getCubeManager().getCube(cubeName);
+        try {
+            // Step 1. create a new segment if not exists
+            CubeSegment newSeg = null;
+            Pair<Long, Long> segmentRange = 
CubeSegment.parseSegmentName(segmentName);
+            boolean segmentExists = false;
+            for (CubeSegment segment : cubeInstance.getSegments()) {
+                SegmentRange.TSRange tsRange = segment.getTSRange();
+                if (tsRange.start.v.equals(segmentRange.getFirst()) && 
segmentRange.getSecond().equals(tsRange.end.v)) {
+                    segmentExists = true;
+                    newSeg = segment;
+                }
+            }
+
+            if (!segmentExists) {
+                logger.debug("Create segment for {} {} .", cubeName, 
segmentName);
+                newSeg = 
coordinator.getCubeManager().appendSegment(cubeInstance,
+                        new SegmentRange.TSRange(segmentRange.getFirst(), 
segmentRange.getSecond()));
+            } else {
+                logger.info("Segment {} exists.", segmentName);
+            }
+
+            // Step 2. create and submit new build job
+            DefaultChainedExecutable executable = 
getStreamingCubingJob(newSeg);
+            coordinator.getExecutableManager().addJob(executable);
+            String jobId = executable.getId();
+            newSeg.setLastBuildJobID(jobId);
+
+            // Step 3. add it to job trigger list
+            SegmentJobBuildInfo segmentJobBuildInfo = new 
SegmentJobBuildInfo(cubeName, segmentName, jobId);
+            addToJobTrackList(segmentJobBuildInfo);
+
+            // Step 4. add job to stream metadata in case of current node dead
+            SegmentBuildState.BuildState state = new 
SegmentBuildState.BuildState();
+            state.setBuildStartTime(System.currentTimeMillis());
+            state.setState(SegmentBuildState.BuildState.State.BUILDING);
+            state.setJobId(jobId);
+            logger.debug("Commit building job {} for {} {} .", jobId, 
cubeName, segmentName);
+            
coordinator.getStreamMetadataStore().updateSegmentBuildState(cubeName, 
segmentName, state);
+            return true;
+        } catch (Exception e) {
+            logger.error("Streaming job submit fail, cubeName:" + cubeName + " 
segment:" + segmentName, e);
+            return false;
+        }
+    }
+
+    /**
+     * Check segment which in building state
+     *
+     * @return true if we need to resubmit a new build job, else false
+     */
+    boolean checkSegmentBuildingJob(SegmentBuildState segmentState, String 
cubeName, CubeInstance cubeInstance) {
+        String jobId = segmentState.getState().getJobId();
+        logger.debug("There is segment in building, cube:{} segment:{} 
jobId:{}", cubeName,
+                segmentState.getSegmentName(), jobId);
+        long buildStartTime = segmentState.getState().getBuildStartTime();
+        if (buildStartTime != 0 && jobId != null) {
+            long buildDuration = System.currentTimeMillis() - buildStartTime;
+
+            // Check build state after 15 minutes
+            if (buildDuration < 15 * 60 * 1000) {
+                return false;
+            }
+            CubingJob cubingJob = (CubingJob) 
coordinator.getExecutableManager().getJob(jobId);
+            Preconditions.checkNotNull(cubingJob, "CubingJob should not be 
null.");
+            ExecutableState jobState = cubingJob.getStatus();
+
+            // If job is already succeed and HBase segment in ready state, 
remove the build state
+            if (ExecutableState.SUCCEED.equals(jobState)) {
+                CubeSegment cubeSegment = 
cubeInstance.getSegment(segmentState.getSegmentName(), null);
+                if (cubeSegment != null && SegmentStatusEnum.READY == 
cubeSegment.getStatus()) {
+                    logger.info("Job:{} is already succeed, and segment:{} is 
ready, remove segment build state", jobId,
+                            segmentState.getSegmentName());
+                    
coordinator.getStreamMetadataStore().removeSegmentBuildState(cubeName,
+                            segmentState.getSegmentName());
+                }
+                return false;
+            }
+
+            // If a job is in error state, just retry it
+            if (ExecutableState.ERROR.equals(jobState)) {
+                logger.info("Job:{} is error, resume the job.", jobId);
+                coordinator.getExecutableManager().resumeJob(jobId);
+                return false;
+            }
+
+            // If a job is discard, we will try to resumbit it later.
+            if (ExecutableState.DISCARDED.equals(jobState)) {
+                logger.info("Job:{} is discard, resubmit it later.", jobId);
+                return true;
+            } else {
+                logger.info("Job:{} is in running, job state: {}.", jobId, 
jobState);
+            }
+        } else {
+            logger.info("Unknown state {}", segmentState);
+        }
+        return false;
+    }
+
+    /**
+     * <pre>
+     *     When all replica sets have uploaded their local segment cache to 
deep storage, (that is to say all required data is uploaded), we can mark
+     *     this segment as ready to submit a MapReduce job to build into HBase.
+     *
+     *     Note the special situation, when some replica set didn't upload any 
data in some segment duration for lack
+     *     of entered kafka event, we still try to check the newer segment 
duration, if found some newer segment have data
+     *     uploaded for current miss replica set, we marked local segment 
cache has been uploaded for that replica for current segment.
+     *     This workround will prevent job-submit queue from blocking by no 
data for some topic partition.
+     * </pre>
+     *
+     * @return true if current segment is ready to submit a build job, else 
false
+     */
+    boolean checkSegmentIsReadyToBuild(List<SegmentBuildState> 
allSegmentStates, int checkedSegmentIdx,
+            Set<Integer> cubeAssignedReplicaSets) {
+        SegmentBuildState checkedSegmentState = 
allSegmentStates.get(checkedSegmentIdx);
+        Set<Integer> notCompleteReplicaSets = Sets
+                .newHashSet(Sets.difference(cubeAssignedReplicaSets, 
checkedSegmentState.getCompleteReplicaSets()));
+        if (notCompleteReplicaSets.isEmpty()) {
+            return true;
+        } else {
+            for (int i = checkedSegmentIdx + 1; i < allSegmentStates.size(); 
i++) {
+                SegmentBuildState segmentBuildState = allSegmentStates.get(i);
+                Set<Integer> completeReplicaSetsForNext = 
segmentBuildState.getCompleteReplicaSets();
+                Iterator<Integer> notCompleteRSItr = 
notCompleteReplicaSets.iterator();
+                while (notCompleteRSItr.hasNext()) {
+                    Integer rsID = notCompleteRSItr.next();
+                    if (completeReplicaSetsForNext.contains(rsID)) {
+                        logger.info(
+                                "the replica set:{} doesn't have data for 
segment:{}, but have data for later segment:{}",
+                                rsID, checkedSegmentState.getSegmentName(), 
segmentBuildState.getSegmentName());
+                        notCompleteRSItr.remove();
+                    }
+                }
+            }
+            return notCompleteReplicaSets.isEmpty();
+        }
+    }
+
+    public Set<String> getCubeCheckList() {
+        return cubeCheckList;
+    }
+
+    public DefaultChainedExecutable getStreamingCubingJob(CubeSegment segment){
+        return new StreamingCubingEngine().createStreamingCubingJob(segment, 
"SYSTEM");
+    }
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
new file mode 100644
index 0000000..81b2e8c
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent;
+import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
+import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
+import org.apache.kylin.stream.core.model.AssignRequest;
+import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.model.Node;
+import org.apache.kylin.stream.core.model.PauseConsumersRequest;
+import org.apache.kylin.stream.core.model.ReplicaSet;
+import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
+import org.apache.kylin.stream.core.model.StartConsumersRequest;
+import org.apache.kylin.stream.core.model.StopConsumersRequest;
+import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
+import org.apache.kylin.stream.core.source.ISourcePosition;
+import org.apache.kylin.stream.core.source.ISourcePositionHandler;
+import org.apache.kylin.stream.core.source.IStreamingSource;
+import org.apache.kylin.stream.core.source.Partition;
+import org.apache.kylin.stream.core.source.StreamingSourceFactory;
+import org.apache.kylin.stream.core.util.HDFSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * <pre>
+ * This class manage operation related to multi streaming receivers. They are 
often not atomic and maybe idempotent.
+ *
+ * In a multi-step transcation, following steps should be thought twice:
+ *  1. should fail fast or continue when exception thrown.
+ *  2. should API(remote call) be synchronous or asynchronous
+ *  3. when transcation failed, will roll back always succeed
+ *  4. transcation should be idempotent so when it failed, it could be fixed 
by retry
+ * </pre>
+ */
+public class ReceiverClusterManager {
+    private static final Logger logger = 
LoggerFactory.getLogger(ReceiverClusterManager.class);
+
+    ReceiverClusterManager(StreamingCoordinator coordinator) {
+        this.coordinator = coordinator;
+    }
+
+    public StreamingCoordinator getCoordinator() {
+        return coordinator;
+    }
+
+    StreamingCoordinator coordinator;
+
+    // 
================================================================================
+    // ======================== Rebalance related operation 
===========================
+
+    @NotAtomicAndNotIdempotent
+    void doReBalance(List<CubeAssignment> previousAssignments, 
List<CubeAssignment> newAssignments) {
+        Map<String, CubeAssignment> previousCubeAssignMap = Maps.newHashMap();
+        Map<String, CubeAssignment> newCubeAssignMap = Maps.newHashMap();
+        for (CubeAssignment cubeAssignment : previousAssignments) {
+            previousCubeAssignMap.put(cubeAssignment.getCubeName(), 
cubeAssignment);
+        }
+        for (CubeAssignment cubeAssignment : newAssignments) {
+            newCubeAssignMap.put(cubeAssignment.getCubeName(), cubeAssignment);
+        }
+        try {
+            Set<String> preCubes = previousCubeAssignMap.keySet();
+            Set<String> newCubes = newCubeAssignMap.keySet();
+            if (!preCubes.equals(newCubes)) {
+                logger.error("previous assignment cubes:{}, new assignment 
cubes:{}", preCubes, newCubes);
+                throw new IllegalStateException("previous cube assignments");
+            }
+
+            MapDifference<String, CubeAssignment> diff = 
Maps.difference(previousCubeAssignMap, newCubeAssignMap);
+            Map<String, MapDifference.ValueDifference<CubeAssignment>> 
changedAssignments = diff.entriesDiffering();
+
+            for (Map.Entry<String, 
MapDifference.ValueDifference<CubeAssignment>> changedAssignmentEntry : 
changedAssignments
+                    .entrySet()) {
+                String cubeName = changedAssignmentEntry.getKey();
+                MapDifference.ValueDifference<CubeAssignment> cubeAssignDiff = 
changedAssignmentEntry.getValue();
+                reassignCubeImpl(cubeName, cubeAssignDiff.leftValue(), 
cubeAssignDiff.rightValue());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NotAtomicAndNotIdempotent
+    void reassignCubeImpl(String cubeName, CubeAssignment preAssignments, 
CubeAssignment newAssignments) {
+        logger.info("start cube reBalance, cube:{}, previous assignments:{}, 
new assignments:{}", cubeName,
+                preAssignments, newAssignments);
+        if (newAssignments.equals(preAssignments)) {
+            logger.info("the new assignment is the same as the previous 
assignment, do nothing for this reassignment");
+            return;
+        }
+        CubeInstance cubeInstance = 
getCoordinator().getCubeManager().getCube(cubeName);
+        doReassignWithoutCommit(cubeInstance, preAssignments, newAssignments);
+
+        // add empty partitions to the removed replica sets, means that 
there's still data in the replica set, but no new data will be consumed.
+        MapDifference<Integer, List<Partition>> assignDiff = 
Maps.difference(preAssignments.getAssignments(),
+                newAssignments.getAssignments());
+        Map<Integer, List<Partition>> removedAssign = 
assignDiff.entriesOnlyOnLeft();
+        for (Integer removedReplicaSet : removedAssign.keySet()) {
+            newAssignments.addAssignment(removedReplicaSet, Lists.<Partition> 
newArrayList());
+        }
+
+        logger.info("Commit reassign {} transaction.", cubeName);
+        
getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(newAssignments);
+        AssignmentsCache.getInstance().clearCubeCache(cubeName);
+    }
+
+    /**
+     * <pre>
+     * Reassign action is a process which move some consumption task from some 
replica set to new replica set.
+     *
+     * It is necessary in some case such as :
+     *  - new topic partition was added
+     *  - wordload not balance between different replica set
+     *  - some nodes have to be offlined so the consumption task have be 
transfered
+     * </pre>
+     *
+     * @param preAssignments current assignment
+     * @param newAssignments the assignment we want to assign
+     */
+    @NotAtomicAndNotIdempotent
+    void doReassignWithoutCommit(CubeInstance cubeInstance, CubeAssignment 
preAssignments,
+            CubeAssignment newAssignments) {
+        String cubeName = preAssignments.getCubeName();
+
+        // Step 0. Prepare and check
+        IStreamingSource streamingSource = 
StreamingSourceFactory.getStreamingSource(cubeInstance);
+        MapDifference<Integer, List<Partition>> assignDiff = 
Maps.difference(preAssignments.getAssignments(),
+                newAssignments.getAssignments());
+        Map<Integer, List<Partition>> sameAssign = 
assignDiff.entriesInCommon();
+        Map<Integer, List<Partition>> newAssign = 
assignDiff.entriesOnlyOnRight();
+        Map<Integer, List<Partition>> removedAssign = 
assignDiff.entriesOnlyOnLeft();
+
+        List<ISourcePosition> allPositions = Lists.newArrayList();
+        List<ReplicaSet> successSyncReplicaSet = Lists.newArrayList();
+        ISourcePosition consumePosition;
+
+        // TODO check health state of related receivers before real action 
will reduce chance of facing inconsistent state
+
+        // Step 1. Stop and sync all replica set in preAssignment
+        try {
+            for (Map.Entry<Integer, List<Partition>> assignmentEntry : 
preAssignments.getAssignments().entrySet()) {
+                Integer replicaSetID = assignmentEntry.getKey();
+                if (sameAssign.containsKey(replicaSetID)) {
+                    logger.info("the assignment is not changed for cube:{}, 
replicaSet:{}", cubeName, replicaSetID);
+                    continue;
+                }
+                ReplicaSet rs = 
getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
+                ISourcePosition position = 
syncAndStopConsumersInRs(streamingSource, cubeName, rs);
+                allPositions.add(position);
+                successSyncReplicaSet.add(rs);
+            }
+            consumePosition = 
streamingSource.getSourcePositionHandler().mergePositions(allPositions,
+                    ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
+            logger.info("the consumer position for cube:{} is:{}", cubeName, 
consumePosition);
+        } catch (Exception e) {
+            logger.error("fail to sync assign replicaSet for cube:" + 
cubeName, e);
+            Set<Integer> needRollback = 
successSyncReplicaSet.stream().map(ReplicaSet::getReplicaSetID)
+                    .collect(Collectors.toSet());
+            for (ReplicaSet rs : successSyncReplicaSet) {
+                StartConsumersRequest request = new StartConsumersRequest();
+                request.setCube(cubeName);
+                try {
+                    startConsumersInReplicaSet(rs, request);
+                    needRollback.remove(rs.getReplicaSetID());
+                } catch (IOException e1) {
+                    logger.error("fail to start consumers for cube:" + 
cubeName + " replicaSet:" + rs.getReplicaSetID(),
+                            e1);
+                }
+            }
+            if (needRollback.isEmpty()) {
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_SUCCESS,
+                        ClusterStateException.TransactionStep.STOP_AND_SNYC, 
"", e);
+            } else {
+                StringBuilder str = new StringBuilder();
+                try {
+                    str.append("Fail 
restart:").append(JsonUtil.writeValueAsString(needRollback));
+                } catch (JsonProcessingException jpe) {
+                    logger.error("", jpe);
+                }
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_FAILED,
+                        ClusterStateException.TransactionStep.STOP_AND_SNYC, 
str.toString(), e);
+            }
+        }
+
+        List<ReplicaSet> successAssigned = Lists.newArrayList();
+        List<ReplicaSet> successStarted = Lists.newArrayList();
+        Set<Node> failedConvertToImmutableNodes = new HashSet<>();
+
+        try {
+            // Step 2. Assign consumption task to newAssignment without start 
consumption
+            for (Map.Entry<Integer, List<Partition>> cubeAssignmentEntry : 
newAssignments.getAssignments().entrySet()) {
+                Integer replicaSetID = cubeAssignmentEntry.getKey();
+                if (sameAssign.containsKey(replicaSetID)) {
+                    continue;
+                }
+
+                ReplicaSet rs = 
getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
+                logger.info("assign cube:{} to replicaSet:{}", cubeName, 
replicaSetID);
+                assignCubeToReplicaSet(rs, cubeName, 
cubeAssignmentEntry.getValue(), false, true);
+                successAssigned.add(rs);
+            }
+
+            // Step 3. Start consumption task to newAssignment 
+            for (Map.Entry<Integer, List<Partition>> cubeAssignmentEntry : 
newAssignments.getAssignments().entrySet()) {
+                Integer replicaSetID = cubeAssignmentEntry.getKey();
+                if (sameAssign.containsKey(replicaSetID)) {
+                    continue;
+                }
+
+                ConsumerStartProtocol consumerStartProtocol = new 
ConsumerStartProtocol(
+                        
streamingSource.getSourcePositionHandler().serializePosition(consumePosition.advance()));
+
+                ReplicaSet rs = 
getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
+                StartConsumersRequest startRequest = new 
StartConsumersRequest();
+                startRequest.setCube(cubeName);
+                startRequest.setStartProtocol(consumerStartProtocol);
+                logger.info("start consumers for cube:{}, replicaSet:{}, 
startRequest:{}", cubeName, replicaSetID,
+                        startRequest);
+                startConsumersInReplicaSet(rs, startRequest);
+                successStarted.add(rs);
+            }
+
+            // Step 4. Ask removed replica set to force transfer thier local 
segment into immutable 
+            for (Map.Entry<Integer, List<Partition>> removeAssignmentEntry : 
removedAssign.entrySet()) {
+                Integer replicaSetID = removeAssignmentEntry.getKey();
+                logger.info("make cube immutable for cube:{}, replicaSet{}", 
cubeName, replicaSetID);
+                ReplicaSet rs = 
getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
+                List<Node> failedNodes = makeCubeImmutableInReplicaSet(rs, 
cubeName);
+                failedConvertToImmutableNodes.addAll(failedNodes);
+            }
+            if (!failedConvertToImmutableNodes.isEmpty()) {
+                throw new IOException("Failed to convert to immutable state. 
");
+            }
+
+            logger.info("Finish cube reassign for cube:{} .", cubeName);
+        } catch (IOException e) {
+            logger.error("Fail to start consumers for cube:" + cubeName, e);
+            // roll back success started
+            Set<Integer> rollbackStarted = 
successStarted.stream().map(ReplicaSet::getReplicaSetID)
+                    .collect(Collectors.toSet());
+            for (ReplicaSet rs : successStarted) {
+                try {
+                    StopConsumersRequest stopRequest = new 
StopConsumersRequest();
+                    stopRequest.setCube(cubeName);
+                    // for new group assignment, need to stop the consumers 
and remove the cube data
+                    if (newAssign.containsKey(rs.getReplicaSetID())) {
+                        stopRequest.setRemoveData(true);
+                    }
+                    stopConsumersInReplicaSet(rs, stopRequest);
+                    rollbackStarted.remove(rs.getReplicaSetID());
+                } catch (IOException e1) {
+                    logger.error("fail to stop consumers for cube:" + cubeName 
+ " replicaSet:" + rs.getReplicaSetID(),
+                            e1);
+                }
+            }
+
+            // roll back success assignment
+            Set<Integer> rollbackAssigned = 
successAssigned.stream().map(ReplicaSet::getReplicaSetID)
+                    .collect(Collectors.toSet());
+            for (ReplicaSet rs : successAssigned) {
+                try {
+                    List<Partition> partitions = 
preAssignments.getPartitionsByReplicaSetID(rs.getReplicaSetID());
+                    assignCubeToReplicaSet(rs, cubeName, partitions, true, 
true);
+                    rollbackAssigned.remove(rs.getReplicaSetID());
+                } catch (IOException e1) {
+                    logger.error("fail to start consumers for cube:" + 
cubeName + " replicaSet:" + rs.getReplicaSetID(),
+                            e1);
+                }
+            }
+
+            Set<Node> failedReceiver = new 
HashSet<>(failedConvertToImmutableNodes);
+            for (Node node : failedConvertToImmutableNodes) {
+                try {
+                    getCoordinator().makeCubeImmutableForReceiver(node, 
cubeName);
+                    failedReceiver.remove(node);
+                } catch (IOException ioe) {
+                    logger.error("fail to make cube immutable for cube:" + 
cubeName + " to " + node, ioe);
+                }
+            }
+
+            // summary after reassign action
+            StringBuilder str = new StringBuilder();
+            try {
+                
str.append("FailStarted:").append(JsonUtil.writeValueAsString(rollbackStarted)).append(";");
+                
str.append("FailAssigned:").append(JsonUtil.writeValueAsString(rollbackAssigned)).append(";");
+                
str.append("FailRemotedPresisted:").append(JsonUtil.writeValueAsString(failedReceiver));
+            } catch (JsonProcessingException jpe) {
+                logger.error("", jpe);
+            }
+
+            String failedInfo = str.toString();
+
+            if (!rollbackStarted.isEmpty()) {
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_FAILED,
+                        ClusterStateException.TransactionStep.START_NEW, 
failedInfo, e);
+            } else if (!rollbackAssigned.isEmpty()) {
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_FAILED,
+                        ClusterStateException.TransactionStep.ASSIGN_NEW, 
failedInfo, e);
+            } else if (!failedReceiver.isEmpty()) {
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_FAILED,
+                        ClusterStateException.TransactionStep.MAKE_IMMUTABLE, 
failedInfo, e);
+            } else {
+                throw new ClusterStateException(cubeName, 
ClusterStateException.ClusterState.ROLLBACK_SUCCESS,
+                        ClusterStateException.TransactionStep.ASSIGN_NEW, 
failedInfo, e);
+            }
+        }
+
+    }
+
+    // 
=========================================================================================
+    // ======================= Operation related to single replica set   
=======================
+
+    /**
+     * Sync/align consumers in the replica set, ensure that all consumers in 
the group consume to the same position.
+     *
+     * @return the consume position which all receivers have aligned.
+     */
+    @NotAtomicAndNotIdempotent
+    ISourcePosition syncAndStopConsumersInRs(IStreamingSource streamingSource, 
String cubeName, ReplicaSet replicaSet)
+            throws IOException {
+        if (replicaSet.getNodes().size() > 1) { // when group nodes more than 
1, force to sync the group
+            logger.info("sync consume for cube:{}, replicaSet:{}", cubeName, 
replicaSet.getReplicaSetID());
+
+            PauseConsumersRequest suspendRequest = new PauseConsumersRequest();
+            suspendRequest.setCube(cubeName);
+            List<ConsumerStatsResponse> allReceiverConsumeState = 
pauseConsumersInReplicaSet(replicaSet,
+                    suspendRequest);
+
+            List<ISourcePosition> consumePositionList = 
Lists.transform(allReceiverConsumeState,
+                    new Function<ConsumerStatsResponse, ISourcePosition>() {
+                        @Nullable
+                        @Override
+                        public ISourcePosition apply(@Nullable 
ConsumerStatsResponse input) {
+                            return 
streamingSource.getSourcePositionHandler().parsePosition(input.getConsumePosition());
+                        }
+                    });
+            ISourcePosition consumePosition = 
streamingSource.getSourcePositionHandler()
+                    .mergePositions(consumePositionList, 
ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
+            ResumeConsumerRequest resumeRequest = new ResumeConsumerRequest();
+            resumeRequest.setCube(cubeName);
+            resumeRequest
+                    
.setResumeToPosition(streamingSource.getSourcePositionHandler().serializePosition(consumePosition));
+
+            // assume that the resume will always succeed when the replica set 
can be paused successfully
+            resumeConsumersInReplicaSet(replicaSet, resumeRequest);
+            return consumePosition;
+        } else if (replicaSet.getNodes().size() == 1) {
+            Node receiver = replicaSet.getNodes().iterator().next();
+            StopConsumersRequest request = new StopConsumersRequest();
+            request.setCube(cubeName);
+            logger.info("stop consumers for cube:{}, receiver:{}", cubeName, 
receiver);
+            List<ConsumerStatsResponse> stopResponse = 
stopConsumersInReplicaSet(replicaSet, request);
+            return 
streamingSource.getSourcePositionHandler().parsePosition(stopResponse.get(0).getConsumePosition());
+        } else {
+            return null;
+        }
+    }
+
+    @NotAtomicAndNotIdempotent
+    void startConsumersInReplicaSet(ReplicaSet rs, final StartConsumersRequest 
request) throws IOException {
+        for (final Node node : rs.getNodes()) {
+            getCoordinator().startConsumersForReceiver(node, request);
+        }
+    }
+
+    @NotAtomicAndNotIdempotent
+    List<Node> makeCubeImmutableInReplicaSet(ReplicaSet rs, String cubeName) {
+        List<Node> failedNodes = new ArrayList<>();
+        for (final Node node : rs.getNodes()) {
+            try {
+                getCoordinator().makeCubeImmutableForReceiver(node, cubeName);
+            } catch (IOException ioe) {
+                logger.error(String.format(Locale.ROOT, "Convert %s to 
immutable for node %s failed.", cubeName,
+                        node.toNormalizeString()), ioe);
+                failedNodes.add(node);
+            }
+        }
+        return failedNodes;
+    }
+
+    @NotAtomicAndNotIdempotent
+    List<ConsumerStatsResponse> resumeConsumersInReplicaSet(ReplicaSet rs, 
final ResumeConsumerRequest request)
+            throws IOException {
+        List<ConsumerStatsResponse> consumerStats = Lists.newArrayList();
+        for (final Node node : rs.getNodes()) {
+            
consumerStats.add(getCoordinator().resumeConsumersForReceiver(node, request));
+        }
+        return consumerStats;
+    }
+
+    @NotAtomicIdempotent
+    List<ConsumerStatsResponse> stopConsumersInReplicaSet(ReplicaSet rs, final 
StopConsumersRequest request)
+            throws IOException {
+        List<ConsumerStatsResponse> consumerStats = Lists.newArrayList();
+        for (final Node node : rs.getNodes()) {
+            consumerStats.add(getCoordinator().stopConsumersForReceiver(node, 
request));
+        }
+        return consumerStats;
+    }
+
+    @NotAtomicIdempotent
+    List<ConsumerStatsResponse> pauseConsumersInReplicaSet(ReplicaSet rs, 
final PauseConsumersRequest request)
+            throws IOException {
+        List<ConsumerStatsResponse> consumerStats = Lists.newArrayList();
+        List<Node> successReceivers = Lists.newArrayList();
+        try {
+            for (final Node node : rs.getNodes()) {
+                
consumerStats.add(getCoordinator().pauseConsumersForReceiver(node, request));
+                successReceivers.add(node);
+            }
+        } catch (IOException ioe) {
+            logger.info("Roll back pause consumers for receivers: {}", 
successReceivers);
+            ResumeConsumerRequest resumeRequest = new ResumeConsumerRequest();
+            resumeRequest.setCube(request.getCube());
+            for (Node receiver : successReceivers) {
+                getCoordinator().resumeConsumersForReceiver(receiver, 
resumeRequest);
+            }
+            throw ioe;
+        }
+        return consumerStats;
+    }
+
+    /**
+     * Assign consumption task of specific topic partitions to current replica 
set.
+     *
+     * @param partitions specific topic partitions which replica set should 
consume
+     * @param startConsumer should receiver start consumption at once
+     * @param mustAllSucceed if set to true, we should ensure all receivers 
has been correctly notified; false
+     *                       for ensure at least one receivers has been 
correctly notified
+     *
+     * @throws IOException throwed when assign failed
+     */
+    @NotAtomicIdempotent
+    void assignCubeToReplicaSet(ReplicaSet rs, String cubeName, 
List<Partition> partitions, boolean startConsumer,
+            boolean mustAllSucceed) throws IOException {
+        boolean hasNodeAssigned = false;
+        IOException exception = null;
+        AssignRequest assignRequest = new AssignRequest();
+        assignRequest.setCubeName(cubeName);
+        assignRequest.setPartitions(partitions);
+        assignRequest.setStartConsumers(startConsumer);
+        for (final Node node : rs.getNodes()) {
+            try {
+                getCoordinator().assignToReceiver(node, assignRequest);
+                hasNodeAssigned = true;
+            } catch (IOException e) {
+                if (mustAllSucceed) {
+                    throw e;
+                }
+                exception = e;
+                logger.error("cube:" + cubeName + " consumers start fail for 
node:" + node.toString(), e);
+            }
+        }
+        if (!hasNodeAssigned) {
+            if (exception != null) {
+                throw exception;
+            }
+        }
+    }
+
+    /**
+     * When a segment build job succeed, we should do some following job to 
deliver it to historical part.
+     *
+     * @return true if promote succeed, else false
+     */
+    @NotAtomicIdempotent
+    protected boolean segmentBuildComplete(CubingJob cubingJob, CubeInstance 
cubeInstance, CubeSegment cubeSegment,
+            SegmentJobBuildInfo segmentBuildInfo) throws IOException {
+        String cubeName = segmentBuildInfo.cubeName;
+        String segmentName = segmentBuildInfo.segmentName;
+
+        // Step 1. check if the ready to promote into HBase Ready Segment and 
promote current segment
+        if (!checkPreviousSegmentReady(cubeSegment)) {
+            logger.warn("Segment:{}'s previous segment is not ready, will not 
set the segment to ready.", cubeSegment);
+            return false;
+        }
+
+        if (!SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) {
+            promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
+            logger.debug("Promote {} succeed.", segmentName);
+        }
+
+        // Step 2. delete local segment files in receiver side because these 
are useless now
+        CubeAssignment assignments = 
getCoordinator().getStreamMetadataStore().getAssignmentsByCube(cubeName);
+        for (int replicaSetID : assignments.getReplicaSetIDs()) {
+
+            // Step 2.1 normal case
+            ReplicaSet rs = 
getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
+            for (Node node : rs.getNodes()) {
+                try {
+                    getCoordinator().notifyReceiverBuildSuccess(node, 
cubeName, segmentName);
+                } catch (IOException e) {
+                    logger.error("error when remove cube segment for 
receiver:" + node, e);
+                }
+            }
+
+            // Step 2.2 specical case
+            // For the replica set that doesn't have partitions, that should 
be the "Removed Rs" in latest reassign action.
+            // We check if any local segment belong to current cube exists, if 
nothing left, "Removed Rs" will be removed in assignment from StreamMetadata.
+            if 
(assignments.getPartitionsByReplicaSetID(replicaSetID).isEmpty()) {
+                logger.info(
+                        "No partition is assign to the replicaSet:{}, check 
whether there are local segments on the rs.",
+                        replicaSetID);
+                Node leader = rs.getLeader();
+                try {
+                    ReceiverCubeStats receiverCubeStats = 
getCoordinator().getReceiverAdminClient()
+                            .getReceiverCubeStats(leader, cubeName);
+                    Set<String> segments = 
receiverCubeStats.getSegmentStatsMap().keySet();
+                    if (segments.isEmpty()) {
+                        logger.info("no local segments exist for 
replicaSet:{}, cube:{}, update assignments.",
+                                replicaSetID, cubeName);
+                        assignments.removeAssignment(replicaSetID);
+                        
getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(assignments);
+                    }
+                } catch (IOException e) {
+                    logger.error("error when get receiver cube stats from:" + 
leader, e);
+                }
+            }
+        }
+
+        // Step 3. remove entry in StreamMetadata
+        
getCoordinator().getStreamMetadataStore().removeSegmentBuildState(cubeName, 
segmentName);
+
+        // Step 4. delete colmanear segment cache in HDFS becuase it is 
needless currently
+        logger.info("Try to remove the hdfs files for cube:{} segment:{}", 
cubeName, segmentName);
+        removeHDFSFiles(cubeName, segmentName);
+        return true;
+    }
+
+    /**
+     * Promote a segment from realtime part into historical part.
+     */
+    void promoteNewSegment(CubingJob cubingJob, CubeInstance cubeInstance, 
CubeSegment cubeSegment) throws IOException {
+        logger.debug("Try transfer segment's {} state to ready.", 
cubeSegment.getName());
+        long sourceCount = cubingJob.findSourceRecordCount();
+        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
+        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+        Map<Integer, String> sourceCheckpoint = 
getCoordinator().getStreamMetadataStore()
+                .getSourceCheckpoint(cubeInstance.getName(), 
cubeSegment.getName());
+
+        ISourcePositionHandler positionOperator = 
StreamingSourceFactory.getStreamingSource(cubeInstance)
+                .getSourcePositionHandler();
+        Collection<ISourcePosition> sourcePositions = 
Collections2.transform(sourceCheckpoint.values(),
+                new Function<String, ISourcePosition>() {
+                    @Nullable
+                    @Override
+                    public ISourcePosition apply(@Nullable String input) {
+                        return positionOperator.parsePosition(input);
+                    }
+                });
+        ISourcePosition sourcePosition = 
positionOperator.mergePositions(sourcePositions,
+                ISourcePositionHandler.MergeStrategy.KEEP_SMALL);
+        cubeSegment.setLastBuildJobID(cubingJob.getId());
+        cubeSegment.setLastBuildTime(System.currentTimeMillis());
+        cubeSegment.setSizeKB(cubeSizeBytes / 1024);
+        cubeSegment.setInputRecords(sourceCount);
+        cubeSegment.setInputRecordsSize(sourceSizeBytes);
+        
cubeSegment.setStreamSourceCheckpoint(positionOperator.serializePosition(sourcePosition));
+        
getCoordinator().getCubeManager().promoteNewlyBuiltSegments(cubeInstance, 
cubeSegment);
+    }
+
+    /**
+     * <pre>
+     *  We will promote segment to HBase Ready Segment(historical part) in 
sequential way. So here we check
+     *  if the lastest hbase segment of current cube meet two requirement:
+     *     - In ready state
+     *     - Connect to current segment exactly (no gap or overlap)
+     *  If these two requirement met, we could promote current segment to 
HBase Ready Segment.
+     *  </pre>
+     */
+    boolean checkPreviousSegmentReady(CubeSegment currSegment) {
+        long currSegmentStart = currSegment.getTSRange().start.v;
+        CubeInstance cubeInstance = currSegment.getCubeInstance();
+        Segments<CubeSegment> segments = cubeInstance.getSegments();
+        long previousSegmentEnd = -1;
+        for (CubeSegment segment : segments) {
+            long segmentEnd = segment.getTSRange().end.v;
+            if (segmentEnd <= currSegmentStart && segmentEnd > 
previousSegmentEnd) {
+                previousSegmentEnd = segmentEnd;
+            }
+        }
+
+        if (previousSegmentEnd == -1) {
+            return true;
+        }
+
+        for (CubeSegment segment : segments) {
+            long segmentEnd = segment.getTSRange().end.v;
+            if (segmentEnd == previousSegmentEnd && 
SegmentStatusEnum.READY.equals(segment.getStatus())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void removeHDFSFiles(String cubeName, String segmentName) {
+        String segmentHDFSPath = 
HDFSUtil.getStreamingSegmentFilePath(cubeName, segmentName);
+        try {
+            FileSystem fs = HadoopUtil.getFileSystem(segmentHDFSPath);
+            logger.info("Deleting segment data in HDFS {}", segmentHDFSPath);
+            fs.delete(new Path(segmentHDFSPath), true);
+        } catch (Exception e) {
+            logger.error("error when remove hdfs file, hdfs path:{}", 
segmentHDFSPath);
+        }
+    }
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java
new file mode 100644
index 0000000..13f72e6
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate;
+
+/**
+ * A piece of information which record streaming build job
+ */
+public class SegmentJobBuildInfo implements Comparable<SegmentJobBuildInfo>{
+    public final String cubeName;
+    public final String segmentName;
+    public final String jobID;
+    public int retryCnt = 0;
+
+    public SegmentJobBuildInfo(String cubeName, String segmentName, String 
jobID) {
+        this.cubeName = cubeName;
+        this.segmentName = segmentName;
+        this.jobID = jobID;
+    }
+
+    @Override
+    public String toString() {
+        return "SegmentJobBuildInfo{" + "cubeName='" + cubeName + '\'' + ", 
segmentName='" + segmentName + '\''
+                + ", jobID='" + jobID + '\'' + ", retryCnt=" + retryCnt + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SegmentJobBuildInfo that = (SegmentJobBuildInfo) o;
+
+        if (cubeName != null ? !cubeName.equals(that.cubeName) : that.cubeName 
!= null)
+            return false;
+        if (segmentName != null ? !segmentName.equals(that.segmentName) : 
that.segmentName != null)
+            return false;
+        return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = cubeName != null ? cubeName.hashCode() : 0;
+        result = 31 * result + (segmentName != null ? segmentName.hashCode() : 
0);
+        result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public int compareTo(SegmentJobBuildInfo o) {
+        if (!cubeName.equals(o.cubeName)) {
+            return cubeName.compareTo(o.cubeName);
+        }
+        return segmentName.compareTo(o.segmentName);
+    }
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
new file mode 100644
index 0000000..875a4d7
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.coordinator.coordinate;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ServerMode;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.stream.coordinator.StreamMetadataStore;
+import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
+import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
+import org.apache.kylin.stream.coordinator.assign.Assigner;
+import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
+import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
+import 
org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner;
+import org.apache.kylin.stream.coordinator.assign.DefaultAssigner;
+import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
+import org.apache.kylin.stream.coordinator.doctor.ClusterDoctor;
+import org.apache.kylin.stream.coordinator.exception.CoordinateException;
+import 
org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
+import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
+import org.apache.kylin.stream.core.client.ReceiverAdminClient;
+import org.apache.kylin.stream.core.model.AssignRequest;
+import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.model.Node;
+import org.apache.kylin.stream.core.model.PauseConsumersRequest;
+import org.apache.kylin.stream.core.model.ReplicaSet;
+import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
+import org.apache.kylin.stream.core.model.StartConsumersRequest;
+import org.apache.kylin.stream.core.model.StopConsumersRequest;
+import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
+import org.apache.kylin.stream.core.model.UnAssignRequest;
+import org.apache.kylin.stream.core.source.IStreamingSource;
+import org.apache.kylin.stream.core.source.Partition;
+import org.apache.kylin.stream.core.source.StreamingSourceFactory;
+import org.apache.kylin.stream.core.source.StreamingTableSourceInfo;
+import org.apache.kylin.stream.core.util.HDFSUtil;
+import org.apache.kylin.stream.core.util.NamedThreadFactory;
+import org.apache.kylin.stream.core.util.NodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * <pre>
+ * Each Kylin streaming cluster has at least one coordinator processes, 
coordinator
+ * server works as the master node of streaming cluster and handle consume 
task assignment,
+ * membership and streaming cube state management.
+ *
+ * When cluster have several coordinator processes, only the leader try to 
answer coordinator client's
+ * request, other follower will become standby, so single point of failure 
will be eliminated.
+ * </pre>
+ */
+public class StreamingCoordinator implements CoordinatorClient {
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingCoordinator.class);
+    private static final int DEFAULT_PORT = 7070;
+    private static StreamingCoordinator instance = null;
+
+    private StreamMetadataStore streamMetadataStore;
+    private Assigner assigner;
+    private ReceiverAdminClient receiverAdminClient;
+    private CuratorFramework zkClient;
+    private CoordinatorLeaderSelector selector;
+    private ReceiverClusterManager clusterManager;
+    private volatile boolean isLead = false;
+
+    private ScheduledExecutorService streamingJobSubmitExecutor;
+    private ScheduledExecutorService clusterStateCheckExecutor;
+    private ClusterDoctor clusterDoctor;
+    private BuildJobSubmitter buildJobSubmitter;
+
+    private StreamingCoordinator() {
+        this.streamMetadataStore = 
StreamMetadataStoreFactory.getStreamMetaDataStore();
+        clusterManager = new ReceiverClusterManager(this);
+        this.receiverAdminClient = new HttpReceiverAdminClient();
+        this.assigner = getAssigner();
+        this.zkClient = StreamingUtils.getZookeeperClient();
+        this.selector = new CoordinatorLeaderSelector();
+        this.buildJobSubmitter = new BuildJobSubmitter(this);
+        this.clusterDoctor = new ClusterDoctor();
+        if (ServerMode.SERVER_MODE.canServeStreamingCoordinator()) {
+            this.streamingJobSubmitExecutor = 
Executors.newScheduledThreadPool(1,
+                    new NamedThreadFactory("streaming_job_submitter"));
+            this.clusterStateCheckExecutor = 
Executors.newScheduledThreadPool(1,
+                    new NamedThreadFactory("cluster_state_checker"));
+            start();
+        }
+    }
+
+    public static StreamingCoordinator getInstance() {
+        if (instance == null) {
+            synchronized (StreamingCoordinator.class) {
+                if (instance == null) {
+                    instance = new StreamingCoordinator();
+                }
+            }
+        }
+        return instance;
+    }
+
+    private void start() {
+        selector.start();
+        streamingJobSubmitExecutor.scheduleAtFixedRate(buildJobSubmitter, 0, 
2, TimeUnit.MINUTES);
+        clusterStateCheckExecutor.scheduleAtFixedRate(clusterDoctor, 5, 10, 
TimeUnit.MINUTES);
+    }
+
+
+    /**
+     * Assign the streaming cube to replica sets. Replica sets is calculated 
by Assigner.
+     *
+     * @throws CoordinateException when assign action failed
+     */
+    @Override
+    @NotAtomicIdempotent
+    public synchronized void assignCube(String cubeName) {
+        checkLead();
+        streamMetadataStore.addStreamingCube(cubeName);
+        StreamingCubeInfo cube = getStreamCubeInfo(cubeName);
+        CubeAssignment existAssignment = 
streamMetadataStore.getAssignmentsByCube(cube.getCubeName());
+        if (existAssignment != null) {
+            logger.warn("Cube {} is already assigned.", cube.getCubeName());
+            return;
+        }
+        List<ReplicaSet> replicaSets = streamMetadataStore.getReplicaSets();
+        if (replicaSets == null || replicaSets.isEmpty()) {
+            throw new IllegalStateException("No replicaSet is configured in 
system");
+        }
+        CubeAssignment assignment = assigner.assign(cube, replicaSets, 
streamMetadataStore.getAllCubeAssignments());
+        doAssignCube(cubeName, assignment);
+    }
+
+    /**
+     * Unassign action will remove data which not belong to historical part
+     * and stop consumption for all assigned receivers.
+     *
+     * @throws CoordinateException when unAssign action failed
+     */
+    @Override
+    @NotAtomicIdempotent
+    public void unAssignCube(String cubeName) {
+        checkLead();
+        CubeAssignment assignment = 
streamMetadataStore.getAssignmentsByCube(cubeName);
+        if (assignment == null) {
+            return;
+        }
+        List<Node> unAssignedFailReceivers = Lists.newArrayList();
+        try {
+            logger.info("Send unAssign cube:{} request to receivers", 
cubeName);
+            for (Integer replicaSetID : assignment.getReplicaSetIDs()) {
+                ReplicaSet rs = 
streamMetadataStore.getReplicaSet(replicaSetID);
+                UnAssignRequest request = new UnAssignRequest();
+                request.setCube(cubeName);
+                for (Node receiver : rs.getNodes()) {
+                    try {
+                        unAssignToReceiver(receiver, request);
+                    } catch (Exception e) {
+                        logger.error("Exception throws when unAssign 
receiver", e);
+                        unAssignedFailReceivers.add(receiver);
+                    }
+                }
+            }
+            logger.debug("Remove temp hdfs files for {}", cubeName);
+            removeCubeHDFSFiles(cubeName);
+            logger.debug("Clear cube info from job check list");
+            buildJobSubmitter.clearCheckList(cubeName);
+            logger.debug("Commit unassign {} transaction.", cubeName);
+            streamMetadataStore.removeStreamingCube(cubeName);
+            AssignmentsCache.getInstance().clearCubeCache(cubeName);
+        } catch (Exception e) {
+            throw new CoordinateException(e);
+        }
+        if (!unAssignedFailReceivers.isEmpty()) {
+            String msg = "unAssign fail for receivers:" + String.join(",",
+                    
unAssignedFailReceivers.stream().map(Node::toString).collect(Collectors.toList()));
+            throw new CoordinateException(msg);
+        }
+    }
+
+    /**
+     * change assignment of cubeName to assignments
+     */
+    @Override
+    @NotAtomicAndNotIdempotent
+    public synchronized void reAssignCube(String cubeName, CubeAssignment 
assignments) {
+        checkLead();
+        CubeAssignment preAssignments = 
streamMetadataStore.getAssignmentsByCube(cubeName);
+        if (preAssignments == null) {
+            logger.info("no previous cube assign exists, use the new 
assignment:{}", assignments);
+            doAssignCube(cubeName, assignments);
+        } else {
+            clusterManager.reassignCubeImpl(cubeName, preAssignments, 
assignments);
+        }
+    }
+
+    @Override
+    public void segmentRemoteStoreComplete(Node receiver, String cubeName, 
Pair<Long, Long> segment) {
+        checkLead();
+        logger.info("Segment remote store complete signal received for 
cube:{}, segment:{}", cubeName, segment);
+        buildJobSubmitter.addToCheckList(cubeName);
+    }
+
+    @Override
+    @NotAtomicIdempotent
+    public void pauseConsumers(String cubeName) {
+        checkLead();
+        CubeAssignment assignment = 
streamMetadataStore.getAssignmentsByCube(cubeName);
+        PauseConsumersRequest request = new PauseConsumersRequest();
+        request.setCube(cubeName);
+        try {
+            for (Integer rsID : assignment.getReplicaSetIDs()) {
+                ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID);
+                clusterManager.pauseConsumersInReplicaSet(rs, request);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        logger.debug("Committing pauseConsumers {} transaction.", cubeName);
+        streamMetadataStore.saveStreamingCubeConsumeState(cubeName, 
StreamingCubeConsumeState.PAUSED);
+        logger.debug("Committed pauseConsumers {} transaction.", cubeName);
+
+    }
+
+    @Override
+    @NotAtomicIdempotent
+    public void resumeConsumers(String cubeName) {
+        checkLead();
+        CubeAssignment assignment = 
streamMetadataStore.getAssignmentsByCube(cubeName);
+        ResumeConsumerRequest request = new ResumeConsumerRequest();
+        request.setCube(cubeName);
+        try {
+            for (Integer rsID : assignment.getReplicaSetIDs()) {
+                ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID);
+                clusterManager.resumeConsumersInReplicaSet(rs, request);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        logger.debug("Committing resumeConsumers {} transaction.", cubeName);
+        streamMetadataStore.saveStreamingCubeConsumeState(cubeName, 
StreamingCubeConsumeState.RUNNING);
+        logger.debug("Committed resumeConsumers {} transaction.", cubeName);
+    }
+
+    @Override
+    public void replicaSetLeaderChange(int replicaSetID, Node newLeader) {
+        checkLead();
+        Map<String, List<Partition>> assignment = 
streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID);
+        if (assignment == null || assignment.isEmpty()) {
+            return;
+        }
+        // clear assign cache for this group
+        for (String cubeName : assignment.keySet()) {
+            AssignmentsCache.getInstance().clearCubeCache(cubeName);
+        }
+    }
+
+    private void doAssignCube(String cubeName, CubeAssignment assignment) {
+        Set<ReplicaSet> successRS = Sets.newHashSet();
+        try {
+            for (Integer rsID : assignment.getReplicaSetIDs()) {
+                ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID);
+                clusterManager.assignCubeToReplicaSet(rs, cubeName, 
assignment.getPartitionsByReplicaSetID(rsID), true,
+                        false);
+                successRS.add(rs);
+            }
+            logger.debug("Committing assign {} transaction.", cubeName);
+            streamMetadataStore.saveNewCubeAssignment(assignment);
+            logger.debug("Committed assign {} transaction.", cubeName);
+        } catch (Exception e) {
+            // roll back the success group assignment
+            for (ReplicaSet rs : successRS) {
+
+                UnAssignRequest request = new UnAssignRequest();
+                request.setCube(cubeName);
+                unAssignFromReplicaSet(rs, request);
+            }
+            throw new CoordinateException(e);
+        }
+    }
+
+    @Override
+    public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() {
+        checkLead();
+        return reBalancePlan(getEnableStreamingCubes(), 
streamMetadataStore.getReplicaSets());
+    }
+
+    /**
+     * reBalance the cube and partitions
+     * @param newAssignmentsPlan Map<ReplicaSetID, Map<CubeName, 
List<Partition>>
+     */
+    @Override
+    @NotAtomicAndNotIdempotent
+    public synchronized void reBalance(Map<Integer, Map<String, 
List<Partition>>> newAssignmentsPlan) {
+        checkLead();
+        List<CubeAssignment> currCubeAssignments = 
streamMetadataStore.getAllCubeAssignments();
+        List<CubeAssignment> newCubeAssignments = 
AssignmentUtil.convertReplicaSetAssign2CubeAssign(newAssignmentsPlan);
+        clusterManager.doReBalance(currCubeAssignments, newCubeAssignments);
+    }
+
+    Map<Integer, Map<String, List<Partition>>> 
reBalancePlan(List<StreamingCubeInfo> allCubes,
+            List<ReplicaSet> allReplicaSets) {
+        List<CubeAssignment> currCubeAssignments = 
streamMetadataStore.getAllCubeAssignments();
+        return assigner.reBalancePlan(allReplicaSets, allCubes, 
currCubeAssignments);
+    }
+
+    public synchronized void createReplicaSet(ReplicaSet rs) {
+        int replicaSetID = streamMetadataStore.createReplicaSet(rs);
+        try {
+            for (Node receiver : rs.getNodes()) {
+                addReceiverToReplicaSet(receiver, replicaSetID);
+            }
+        } catch (IOException e) {
+            logger.warn("Create replica set failed.", e);
+        }
+    }
+
+    public synchronized void removeReplicaSet(int rsID) {
+        ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID);
+        if (rs == null) {
+            return;
+        }
+        if (rs.getNodes() != null && !rs.getNodes().isEmpty()) {
+            throw new CoordinateException("Cannot remove rs, because there are 
nodes in it.");
+        }
+        Map<String, List<Partition>> assignment = 
streamMetadataStore.getAssignmentsByReplicaSet(rsID);
+        if (assignment != null && !assignment.isEmpty()) {
+            throw new CoordinateException("Cannot remove rs, because there are 
assignments.");
+        }
+        streamMetadataStore.removeReplicaSet(rsID);
+    }
+
+    public synchronized void addNodeToReplicaSet(Integer replicaSetID, String 
nodeID) {
+        ReplicaSet rs = streamMetadataStore.getReplicaSet(replicaSetID);
+        Node receiver = Node.fromNormalizeString(nodeID);
+        List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets();
+        for (ReplicaSet other : allReplicaSet) {
+            if (other.getReplicaSetID() != replicaSetID) {
+                if (other.getNodes().contains(receiver)) {
+                    logger.error("error add Node {} to replicaSet {}, already 
exist in replicaSet {} ", nodeID,
+                            replicaSetID, other.getReplicaSetID());
+                    throw new IllegalStateException("Node exists in 
ReplicaSet!");
+                }
+            }
+        }
+        rs.addNode(receiver);
+        streamMetadataStore.updateReplicaSet(rs);
+        try {
+            Map<String, List<Partition>> assignment = 
streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID);
+            if (assignment == null || assignment.isEmpty()) {
+                return;
+            }
+            addReceiverToReplicaSet(receiver, replicaSetID);
+            // clear assign cache for this group
+            for (String cubeName : assignment.keySet()) {
+                AssignmentsCache.getInstance().clearCubeCache(cubeName);
+            }
+        } catch (IOException e) {
+            logger.warn("fail to add receiver to replicaSet ", e);
+        }
+    }
+
+    public synchronized void removeNodeFromReplicaSet(Integer replicaSetID, 
String nodeID) {
+        ReplicaSet rs = streamMetadataStore.getReplicaSet(replicaSetID);
+        Node receiver = Node.fromNormalizeString(nodeID);
+        rs.removeNode(receiver);
+        streamMetadataStore.updateReplicaSet(rs);
+        try {
+            Map<String, List<Partition>> assignment = 
streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID);
+            removeReceiverFromReplicaSet(receiver);
+            // clear assign cache for this group
+            if (assignment != null) {
+                for (String cubeName : assignment.keySet()) {
+                    AssignmentsCache.getInstance().clearCubeCache(cubeName);
+                }
+            }
+        } catch (IOException e) {
+            logger.warn("Remove node from replicaSet failed.", e);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void stopConsumers(String cubeName) {
+        CubeAssignment assignment = 
streamMetadataStore.getAssignmentsByCube(cubeName);
+        StopConsumersRequest request = new StopConsumersRequest();
+        request.setCube(cubeName);
+        try {
+            for (Integer rsID : assignment.getReplicaSetIDs()) {
+                ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID);
+                clusterManager.stopConsumersInReplicaSet(rs, request);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void unAssignFromReplicaSet(final ReplicaSet rs, final 
UnAssignRequest unAssignRequest) {
+        for (Node receiver : rs.getNodes()) {
+            try {
+                unAssignToReceiver(receiver, unAssignRequest);
+            } catch (IOException e) {
+                logger.error("Error when roll back assignment", e);
+            }
+        }
+    }
+
+    // 
================================================================================
+    // ========================== Receiver related operation 
==========================
+
+    protected void assignToReceiver(final Node receiver, final AssignRequest 
request) throws IOException {
+        receiverAdminClient.assign(receiver, request);
+    }
+
+    private void unAssignToReceiver(final Node receiver, final UnAssignRequest 
request) throws IOException {
+        receiverAdminClient.unAssign(receiver, request);
+    }
+
+    private void addReceiverToReplicaSet(final Node receiver, final int 
replicaSetID) throws IOException {
+        receiverAdminClient.addToReplicaSet(receiver, replicaSetID);
+    }
+
+    private void removeReceiverFromReplicaSet(final Node receiver) throws 
IOException {
+        receiverAdminClient.removeFromReplicaSet(receiver);
+    }
+
+    protected void startConsumersForReceiver(final Node receiver, final 
StartConsumersRequest request)
+            throws IOException {
+        receiverAdminClient.startConsumers(receiver, request);
+    }
+
+    protected ConsumerStatsResponse stopConsumersForReceiver(final Node 
receiver, final StopConsumersRequest request)
+            throws IOException {
+        return receiverAdminClient.stopConsumers(receiver, request);
+    }
+
+    protected ConsumerStatsResponse pauseConsumersForReceiver(final Node 
receiver, final PauseConsumersRequest request)
+            throws IOException {
+        return receiverAdminClient.pauseConsumers(receiver, request);
+    }
+
+    protected ConsumerStatsResponse resumeConsumersForReceiver(final Node 
receiver, final ResumeConsumerRequest request)
+            throws IOException {
+        return receiverAdminClient.resumeConsumers(receiver, request);
+    }
+
+    protected void makeCubeImmutableForReceiver(final Node receiver, final 
String cubeName) throws IOException {
+        receiverAdminClient.makeCubeImmutable(receiver, cubeName);
+    }
+
+    void notifyReceiverBuildSuccess(final Node receiver, final String 
cubeName, final String segmentName)
+            throws IOException {
+        receiverAdminClient.segmentBuildComplete(receiver, cubeName, 
segmentName);
+    }
+
+    // 
================================================================================
+    // ============================ Utility method 
====================================
+
+    public ExecutableManager getExecutableManager() {
+        return ExecutableManager.getInstance(getConfig());
+    }
+
+    public CubeManager getCubeManager() {
+        return CubeManager.getInstance(getConfig());
+    }
+
+    public KylinConfig getConfig() {
+        return KylinConfig.getInstanceFromEnv();
+    }
+
+    List<StreamingCubeInfo> getEnableStreamingCubes() {
+        List<StreamingCubeInfo> allCubes = getStreamingCubes();
+        List<StreamingCubeInfo> result = Lists.newArrayList();
+        for (StreamingCubeInfo cube : allCubes) {
+            CubeInstance cubeInstance = 
getCubeManager().getCube(cube.getCubeName());
+            if (cubeInstance.getStatus() == RealizationStatusEnum.READY) {
+                result.add(cube);
+            }
+        }
+        return result;
+    }
+
+    List<StreamingCubeInfo> getStreamingCubes() {
+        List<String> cubes = streamMetadataStore.getCubes();
+        List<StreamingCubeInfo> result = Lists.newArrayList();
+        for (String cubeName : cubes) {
+            StreamingCubeInfo cubeInfo = getStreamCubeInfo(cubeName);
+            if (cubeInfo != null) {
+                result.add(cubeInfo);
+            }
+        }
+        return result;
+    }
+
+    public StreamingCubeInfo getStreamCubeInfo(String cubeName) {
+        CubeInstance cube = getCubeManager().getCube(cubeName);
+        if (cube == null) {
+            return null;
+        }
+        // count of consumers should be estimated by kylin admin and set in 
cube level
+        int numOfConsumerTasks = 
cube.getConfig().getStreamingCubeConsumerTasksNum();
+        IStreamingSource streamingSource = 
StreamingSourceFactory.getStreamingSource(cube);
+        StreamingTableSourceInfo tableSourceInfo = 
streamingSource.load(cubeName);
+        return new StreamingCubeInfo(cubeName, tableSourceInfo, 
numOfConsumerTasks);
+    }
+
+    public void removeCubeHDFSFiles(String cubeName) {
+        String segmentHDFSPath = HDFSUtil.getStreamingCubeFilePath(cubeName);
+        try {
+            FileSystem fs = HadoopUtil.getFileSystem(segmentHDFSPath);
+            fs.delete(new Path(segmentHDFSPath), true);
+        } catch (Exception e) {
+            logger.error("Error when remove hdfs file, hdfs path:{}", 
segmentHDFSPath);
+        }
+    }
+
+    private void checkLead() {
+        if (!isLead) {
+            Node coordinatorLeader;
+            try {
+                coordinatorLeader = streamMetadataStore.getCoordinatorNode();
+            } catch (StoreException store) {
+                throw new NotLeadCoordinatorException("Lead coordinator can 
not found.", store);
+            }
+            throw new NotLeadCoordinatorException(
+                    "Current coordinator is not lead, please check host " + 
coordinatorLeader);
+        }
+    }
+
+    private Assigner getAssigner() {
+        String assignerName = getConfig().getStreamingAssigner();
+        Assigner oneAssigner;
+        logger.debug("Using assigner {}", assignerName);
+        switch (assignerName) {
+        case "DefaultAssigner":
+            oneAssigner = new DefaultAssigner();
+            break;
+        case "CubePartitionRoundRobinAssigner":
+            oneAssigner = new CubePartitionRoundRobinAssigner();
+            break;
+        default:
+            oneAssigner = new DefaultAssigner();
+        }
+        return oneAssigner;
+    }
+
+    public ReceiverClusterManager getClusterManager() {
+        return clusterManager;
+    }
+
+    public boolean isLead() {
+        return isLead;
+    }
+
+    public StreamMetadataStore getStreamMetadataStore() {
+        return streamMetadataStore;
+    }
+
+    public ReceiverAdminClient getReceiverAdminClient() {
+        return receiverAdminClient;
+    }
+
+    private class CoordinatorLeaderSelector extends 
LeaderSelectorListenerAdapter implements Closeable {
+        private LeaderSelector leaderSelector;
+
+        public CoordinatorLeaderSelector() {
+            String path = StreamingUtils.COORDINATOR_LEAD;
+            leaderSelector = new LeaderSelector(zkClient, path, this);
+            leaderSelector.autoRequeue();
+        }
+
+        @Override
+        public void close() {
+            leaderSelector.close();
+        }
+
+        public void start() {
+            leaderSelector.start();
+        }
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception {
+            logger.info("Current node become the lead coordinator.");
+            
streamMetadataStore.setCoordinatorNode(NodeUtil.getCurrentNode(DEFAULT_PORT));
+            isLead = true;
+            // check job status every minute
+            buildJobSubmitter.restore();
+            while (true) {
+                try {
+                    Thread.sleep(5 * 60 * 1000);
+                } catch (InterruptedException exception) {
+                    Thread.interrupted();
+                    break;
+                }
+                if (!leaderSelector.hasLeadership()) {
+                    break;
+                }
+            }
+            logger.info("Become the follower coordinator.");
+            isLead = false;
+        }
+    }
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java
similarity index 53%
copy from 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
copy to 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java
index dfb42e8..0d63ac7 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java
@@ -14,27 +14,28 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
+package org.apache.kylin.stream.coordinator.coordinate.annotations;
 
-package org.apache.kylin.stream.coordinator.exception;
-
-import org.apache.kylin.stream.core.exception.StreamingException;
-
-public class NotLeadCoordinatorException extends StreamingException {
-    public NotLeadCoordinatorException() {
-        super();
-    }
-
-    public NotLeadCoordinatorException(String s) {
-        super(s);
-    }
-
-    public NotLeadCoordinatorException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NotLeadCoordinatorException(Throwable cause) {
-        super(cause);
-    }
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
+/**
+ * <pre>
+ *  This annotation is a marker for developer.
+ *  It indicate this method which be annotated may failed in some steps.
+ *  But there no need to retry because has no bad influence to other parts.
+ * </pre>
+ *
+ * @see NotAtomicAndNotIdempotent
+ */
+@Documented
+@Retention(RetentionPolicy.SOURCE)
+@Target(ElementType.METHOD)
+@Inherited
+public @interface NonSideEffect {
 }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java
new file mode 100644
index 0000000..e14d7d2
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * <pre>
+ *  This annotation is a marker for developer.
+ *  Indicate this method is a transaction may break in intermediate state and 
has no guarantee of roll back succeed.
+ *  That is to say, it maybe break in some situation and need have a manual 
repair.
+ *  It is a warning and not user friendly.
+ *
+ *  For the worst situation, ClusterDoctor should be fix it as a manual repair 
way.
+ * </pre>
+ *
+ * @see NotAtomicIdempotent
+ * @see org.apache.kylin.stream.coordinator.doctor.ClusterDoctor
+ */
+@Documented
+@Retention(RetentionPolicy.SOURCE)
+@Target(ElementType.METHOD)
+@Inherited
+public @interface NotAtomicAndNotIdempotent {
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java
new file mode 100644
index 0000000..224ef70
--- /dev/null
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * <pre>
+ *  This annotation is a marker for developer.
+ *  It indicate this method which be annotated is not atomic, it may break in 
intermediate step.
+ *  Because exception may thrown in R/W action with remote resources, so it 
stay in non-consistent state.
+ *  But caller can retry it safely to achieve the final consistency。
+ * </pre>
+ *
+ * @see NotAtomicAndNotIdempotent
+ */
+@Documented
+@Retention(RetentionPolicy.SOURCE)
+@Target(ElementType.METHOD)
+@Inherited
+public @interface NotAtomicIdempotent {
+}
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java
similarity index 67%
copy from 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
copy to 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java
index a439511..f06e7ef 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java
@@ -14,23 +14,20 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.stream.coordinator.exception;
+package org.apache.kylin.stream.coordinator.doctor;
 
-public class StoreException extends RuntimeException {
-
-    private static final long serialVersionUID = -9149609663117728575L;
-
-    public StoreException() {
-        super();
-    }
-
-    public StoreException(Throwable t) {
-        super(t);
-    }
+/**
+ * Repair inconsistent state according to result of ClusterStateChecker
+ *
+ * @see 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent
+ * @see ClusterStateChecker
+ */
+public class ClusterDoctor implements Runnable {
 
-    public StoreException(String message, Throwable t) {
-        super(message, t);
+    @Override
+    public void run() {
+        // TO BE IMPLEMENTED
     }
 }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
similarity index 61%
copy from 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
copy to 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
index a439511..2c4fce2 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
@@ -14,23 +14,21 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.stream.coordinator.exception;
+package org.apache.kylin.stream.coordinator.doctor;
 
-public class StoreException extends RuntimeException {
-
-    private static final long serialVersionUID = -9149609663117728575L;
-
-    public StoreException() {
-        super();
-    }
-
-    public StoreException(Throwable t) {
-        super(t);
-    }
-
-    public StoreException(String message, Throwable t) {
-        super(message, t);
-    }
+/**
+ * <pre>
+ * Basic step of this class:
+ *  1. stop coordinator to avoid underlying concurrency issue
+ *  2. check inconsistent state of all receiver cluster
+ *  3. send summary via mail to kylin admin
+ *  4. if need, call ClusterDoctor to repair inconsistent issue
+ * </pre>
+ * @see 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent
+ * @see ClusterDoctor
+ */
+public class ClusterStateChecker {
+    // TO BE IMPLEMENTED
 }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java
index 40231e9..105344a 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java
@@ -18,10 +18,13 @@
 
 package org.apache.kylin.stream.coordinator.exception;
 
+import org.apache.kylin.stream.core.exception.StreamingException;
+
 import java.util.Locale;
 
-public class ClusterStateException extends CoordinateException {
+public class ClusterStateException extends StreamingException {
 
+    @SuppressWarnings("unused")
     private final String cubeName;
     private final ClusterState clusterState;
     private final TransactionStep transactionStep;
@@ -35,6 +38,7 @@ public class ClusterStateException extends 
CoordinateException {
         return transactionStep;
     }
 
+    @SuppressWarnings("unused")
     public String getInconsistentPart() {
         return inconsistentPart;
     }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java
index 415933e..9b28355 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java
@@ -21,6 +21,8 @@ package org.apache.kylin.stream.coordinator.exception;
 import org.apache.kylin.stream.core.exception.StreamingException;
 
 public class CoordinateException extends StreamingException {
+
+    @SuppressWarnings("unused")
     public CoordinateException() {
         super();
     }
@@ -29,6 +31,7 @@ public class CoordinateException extends StreamingException {
         super(s);
     }
 
+    @SuppressWarnings("unused")
     public CoordinateException(String message, Throwable cause) {
         super(message, cause);
     }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
index dfb42e8..8ba032a 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator.exception;
 import org.apache.kylin.stream.core.exception.StreamingException;
 
 public class NotLeadCoordinatorException extends StreamingException {
+    @SuppressWarnings("unused")
     public NotLeadCoordinatorException() {
         super();
     }
@@ -33,6 +34,7 @@ public class NotLeadCoordinatorException extends 
StreamingException {
         super(message, cause);
     }
 
+    @SuppressWarnings("unused")
     public NotLeadCoordinatorException(Throwable cause) {
         super(cause);
     }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
index a439511..a38aa91 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java
@@ -22,6 +22,7 @@ public class StoreException extends RuntimeException {
 
     private static final long serialVersionUID = -9149609663117728575L;
 
+    @SuppressWarnings("unused")
     public StoreException() {
         super();
     }
@@ -30,6 +31,7 @@ public class StoreException extends RuntimeException {
         super(t);
     }
 
+    @SuppressWarnings("unused")
     public StoreException(String message, Throwable t) {
         super(message, t);
     }
diff --git a/build/conf/kylin-server-log4j.properties 
b/stream-coordinator/src/main/resources/log4j.properties
similarity index 67%
copy from build/conf/kylin-server-log4j.properties
copy to stream-coordinator/src/main/resources/log4j.properties
index aba7001..abea109 100644
--- a/build/conf/kylin-server-log4j.properties
+++ b/stream-coordinator/src/main/resources/log4j.properties
@@ -15,18 +15,10 @@
 # limitations under the License.
 #
 
-
-#define appenders
-log4j.appender.file=org.apache.log4j.RollingFileAppender
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.File=${catalina.home}/../logs/kylin.log
+log4j.rootLogger=WARN,stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : 
%m%n
-log4j.appender.file.Append=true
-log4j.appender.file.MaxFileSize=268435456
-log4j.appender.file.MaxBackupIndex=10
-
-#overall config
-log4j.rootLogger=INFO,file
+log4j.logger.org.apache.hadoop=ERROR
 log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=INFO
+log4j.logger.org.springframework=WARN
\ No newline at end of file
diff --git 
a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
 
b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
new file mode 100644
index 0000000..95a0f05
--- /dev/null
+++ 
b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BuildJobSubmitterTest extends StreamingTestBase {
+
+    // shared stub object
+    CubeManager cubeManager;
+    StreamingCoordinator streamingCoordinator;
+    ReceiverClusterManager clusterManager;
+    ExecutableManager executableManager;
+    KylinConfig config = stubKylinConfig();
+
+    void beforeTestTraceEarliestSegmentBuildJob() {
+        // prepare dependency
+        CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 
200L);
+        CubeInstance cubeInstance = stubCubeInstance(cubeSegment);
+
+        cubeManager = stubCubeManager(cubeInstance, false);
+        config = stubKylinConfig();
+
+        Map<String, CubingJob> cubingJobMap = new HashMap<>();
+        cubingJobMap.put(mockBuildJob1, 
stubCubingJob(ExecutableState.SUCCEED));
+        cubingJobMap.put(mockBuildJob2, 
stubCubingJob(ExecutableState.DISCARDED));
+        cubingJobMap.put(mockBuildJob3, stubCubingJob(ExecutableState.ERROR));
+        executableManager = stubExecutableManager(cubingJobMap);
+        streamingCoordinator = stubStreamingCoordinator(config, cubeManager, 
executableManager);
+        clusterManager = stubReceiverClusterManager(streamingCoordinator);
+        
when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager);
+    }
+
+    @Test
+    public void testTraceEarliestSegmentBuildJob() {
+        beforeTestTraceEarliestSegmentBuildJob();
+        BuildJobSubmitter buildJobSubmitter = new 
BuildJobSubmitter(streamingCoordinator);
+        buildJobSubmitter.restore();
+        List<SegmentJobBuildInfo> jobList = 
buildJobSubmitter.traceEarliestSegmentBuildJob();
+        assertEquals(1, jobList.size());
+        assertThat(jobList.stream().map(x -> 
x.jobID).collect(Collectors.toSet()), hasItem(mockBuildJob1));
+        assertEquals(1, buildJobSubmitter.getCubeCheckList().size());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testTraceEarliestSegmentBuildJob2() throws IOException {
+        beforeTestTraceEarliestSegmentBuildJob();
+        when(clusterManager.segmentBuildComplete(isA(CubingJob.class), 
isA(CubeInstance.class), isA(CubeSegment.class),
+                isA(SegmentJobBuildInfo.class))).thenThrow(IOException.class);
+        BuildJobSubmitter buildJobSubmitter = new 
BuildJobSubmitter(streamingCoordinator);
+        buildJobSubmitter.restore();
+        List<SegmentJobBuildInfo> jobList = 
buildJobSubmitter.traceEarliestSegmentBuildJob();
+        assertEquals(0, jobList.size());
+        assertEquals(0, buildJobSubmitter.getCubeCheckList().size());
+    }
+
+    void prepareTestCheckSegmentBuidJobFromMetadata() {
+        CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 
200L);
+        CubeInstance cubeInstance = stubCubeInstance(cubeSegment);
+        config = stubKylinConfig();
+        when(cubeInstance.getConfig()).thenReturn(config);
+
+        cubeManager = stubCubeManager(cubeInstance, false);
+
+        Map<String, CubingJob> cubingJobMap = new HashMap<>();
+        cubingJobMap.put(mockBuildJob1, 
stubCubingJob(ExecutableState.SUCCEED));
+        cubingJobMap.put(mockBuildJob2, 
stubCubingJob(ExecutableState.DISCARDED));
+        cubingJobMap.put(mockBuildJob3, 
stubCubingJob(ExecutableState.DISCARDED));
+        cubingJobMap.put(mockBuildJob4, stubCubingJob(ExecutableState.ERROR));
+
+        executableManager = stubExecutableManager(cubingJobMap);
+        streamingCoordinator = stubStreamingCoordinator(config, cubeManager, 
executableManager);
+        clusterManager = stubReceiverClusterManager(streamingCoordinator);
+        
when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager);
+    }
+
+    @Test
+    public void testCheckSegmentBuidJobFromMetadata() {
+        prepareTestCheckSegmentBuidJobFromMetadata();
+        BuildJobSubmitter buildJobSubmitter = new 
BuildJobSubmitter(streamingCoordinator);
+        buildJobSubmitter.restore();
+        List<String> segmentReadyList = 
buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName2);
+        assertEquals(1, segmentReadyList.size());
+
+        segmentReadyList = 
buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName3);
+        assertEquals(1, segmentReadyList.size());
+    }
+
+    @Test
+    public void testCheckSegmentBuidJobFromMetadata1() {
+        prepareTestCheckSegmentBuidJobFromMetadata();
+        BuildJobSubmitter buildJobSubmitter = new 
BuildJobSubmitter(streamingCoordinator);
+        buildJobSubmitter.restore();
+
+        List<String> segmentReadyList = 
buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName4);
+        verify(executableManager, times(1)).resumeJob(eq(mockBuildJob4));
+        assertEquals(0, segmentReadyList.size());
+    }
+
+    @Test
+    public void testSubmitSegmentBuildJob() throws IOException {
+        CubeSegment cubeSegment1 = stubCubSegment(SegmentStatusEnum.NEW, 100L, 
200L);
+        CubeSegment cubeSegment2 = stubCubSegment(SegmentStatusEnum.NEW, 
1559390400000L, 1559394000000L);
+
+        CubeInstance cubeInstance = stubCubeInstance(cubeSegment1);
+        @SuppressWarnings("unchecked")
+        Iterator<CubeSegment> cubeSegmentIterable = mock(Iterator.class);
+        when(cubeSegmentIterable.hasNext()).thenReturn(true, false);
+        @SuppressWarnings("unchecked")
+        Segments<CubeSegment> segmentSegments = mock(Segments.class, 
RETURNS_DEEP_STUBS);
+        when(cubeSegmentIterable.next()).thenReturn(cubeSegment1, 
cubeSegment2);
+        when(segmentSegments.iterator()).thenReturn(cubeSegmentIterable);
+        when(cubeInstance.getSegments()).thenReturn(segmentSegments);
+
+        config = stubKylinConfig();
+        when(cubeInstance.getConfig()).thenReturn(config);
+
+        cubeManager = stubCubeManager(cubeInstance, false);
+
+        Map<String, CubingJob> cubingJobMap = new HashMap<>();
+        cubingJobMap.put(mockBuildJob1, 
stubCubingJob(ExecutableState.SUCCEED));
+        cubingJobMap.put(mockBuildJob2, 
stubCubingJob(ExecutableState.DISCARDED));
+        cubingJobMap.put(mockBuildJob3, 
stubCubingJob(ExecutableState.DISCARDED));
+        cubingJobMap.put(mockBuildJob4, stubCubingJob(ExecutableState.ERROR));
+
+        executableManager = stubExecutableManager(cubingJobMap);
+        streamingCoordinator = stubStreamingCoordinator(config, cubeManager, 
executableManager);
+        clusterManager = stubReceiverClusterManager(streamingCoordinator);
+        
when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager);
+
+        when(cubeManager.appendSegment(any(CubeInstance.class), 
any(SegmentRange.TSRange.class)))
+                .thenReturn(cubeSegment1, cubeSegment2);
+
+        BuildJobSubmitter buildJobSubmitter = new 
BuildJobSubmitter(streamingCoordinator);
+        buildJobSubmitter = spy(buildJobSubmitter);
+
+        DefaultChainedExecutable cubingJob = 
mock(DefaultChainedExecutable.class);
+        when(cubingJob.getId()).thenReturn(mockBuildJob4);
+        
doReturn(cubingJob).when(buildJobSubmitter).getStreamingCubingJob(any(CubeSegment.class));
+
+        buildJobSubmitter.restore();
+        boolean submitSuccess = 
buildJobSubmitter.submitSegmentBuildJob(cubeName1, segment1);
+        assertTrue(submitSuccess);
+    }
+}
\ No newline at end of file
diff --git 
a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
 
b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
new file mode 100644
index 0000000..5308d79
--- /dev/null
+++ 
b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.coordinator.coordinate;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.test.TestingServer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.ZKUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.stream.coordinator.StreamMetadataStore;
+import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
+import org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.model.Node;
+import org.apache.kylin.stream.core.model.ReplicaSet;
+import org.apache.kylin.stream.core.model.SegmentBuildState;
+import org.apache.kylin.stream.core.source.Partition;
+import org.apache.kylin.stream.source.kafka.KafkaPosition;
+import org.apache.kylin.stream.source.kafka.KafkaPositionHandler;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.mockito.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Make create stub easier.
+ */
+public class StreamingTestBase extends LocalFileMetadataTestCase {
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingTestBase.class);
+    private static final int port = 12181;
+    private static final int retryTimes = 10;
+    private String connectStr;
+
+    /**
+     * mock zookeeper server for streaming metadata
+     */
+    TestingServer testingServer;
+
+    /**
+     * Real StreamMetadataStore based on fake zookeeper server
+     */
+    StreamMetadataStore metadataStore;
+
+    @Before
+    public void init() {
+        logger.debug("Start zk and prepare meatdata.");
+        staticCreateTestMetadata();
+        int realPort = port;
+        for (int i = 0; i <= retryTimes; i++) {
+            try {
+                testingServer = new TestingServer(realPort, false);
+                testingServer.start();
+            } catch (Exception e) { // maybe caused by port occupy
+                logger.error("Failed start zookeeper server at " + realPort, 
e);
+                realPort++;
+                continue;
+            }
+            break;
+        }
+        Assume.assumeTrue(realPort - port < retryTimes);
+        connectStr = "localhost:" + realPort;
+        System.setProperty("kylin.env.zookeeper-connect-string", connectStr);
+        metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore();
+        initZookeeperMetadataStore();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        logger.debug("Tear down server.");
+        ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT);
+        ((ZookeeperStreamMetadataStore) metadataStore).close();
+        testingServer.stop();
+        System.clearProperty("kylin.env.zookeeper-connect-string");
+    }
+
+    // 
================================================================================
+    // ================================ Init Metadata 
=================================
+
+    // replica set info
+    ReplicaSet rs1 = new ReplicaSet();
+    ReplicaSet rs2 = new ReplicaSet();
+    ReplicaSet rs3 = new ReplicaSet();
+    ReplicaSet rs4 = new ReplicaSet();
+
+    // receiver info
+    Node n1 = new Node("Node-1", 1000);
+    Node n2 = new Node("Node-2", 1000);
+    Node n3 = new Node("Node-3", 1000);
+    Node n4 = new Node("Node-4", 1000);
+    Node n5 = new Node("Node-5", 1000);
+    Node n6 = new Node("Node-6", 1000);
+    Node n7 = new Node("Node-7", 1000);
+    Node n8 = new Node("Node-8", 1000);
+
+    // cube info
+    String cubeName1 = "MockRealtimeCube_1";
+    String cubeName2 = "MockRealtimeCube_2";
+    String cubeName3 = "MockRealtimeCube_3";
+    String cubeName4 = "MockRealtimeCube_4";
+
+    // cube segment info
+    String segment1 = "20190601120000_20190601130000";
+    String segment2 = "20190601130000_20190601140000";
+
+    // topic partition info
+    Partition p1 = new Partition(1);
+    Partition p2 = new Partition(2);
+    Partition p3 = new Partition(3);
+    Partition p4 = new Partition(4);
+    Partition p5 = new Partition(5);
+    Partition p6 = new Partition(6);
+
+    String mockBuildJob1 = "mock_job_00001";
+    String mockBuildJob2 = "mock_job_00002";
+    String mockBuildJob3 = "mock_job_00003";
+    String mockBuildJob4 = "mock_job_00004";
+
+    private void initZookeeperMetadataStore() {
+
+        // add receiver
+        metadataStore.addReceiver(n1);
+        metadataStore.addReceiver(n2);
+        metadataStore.addReceiver(n3);
+        metadataStore.addReceiver(n4);
+        metadataStore.addReceiver(n5);
+        metadataStore.addReceiver(n6);
+        metadataStore.addReceiver(n7);
+        metadataStore.addReceiver(n8);
+
+        // add replica set and allocate receivers to replica set
+        rs1.addNode(n1);
+        rs1.addNode(n2);
+        rs2.addNode(n3);
+        rs2.addNode(n4);
+        rs3.addNode(n5);
+        rs3.addNode(n6);
+        rs4.addNode(n7);
+        rs4.addNode(n8);
+
+        int rsId;
+        rsId = metadataStore.createReplicaSet(rs1);
+        rs1.setReplicaSetID(rsId);
+
+        rsId = metadataStore.createReplicaSet(rs2);
+        rs2.setReplicaSetID(rsId);
+
+        rsId = metadataStore.createReplicaSet(rs3);
+        rs3.setReplicaSetID(rsId);
+
+        rsId = metadataStore.createReplicaSet(rs4);
+        rs4.setReplicaSetID(rsId);
+
+        createCubeMetadata(cubeName1, mockBuildJob1, true);
+        createCubeMetadata(cubeName2, mockBuildJob2, false);
+        createCubeMetadata(cubeName3, mockBuildJob3, true);
+        createCubeMetadata(cubeName4, mockBuildJob4, true);
+    }
+
+    public void createCubeMetadata(String cubeName, String jobID, boolean 
addSegmentBuildState) {
+
+        // add assignment for cube
+        Map<Integer, List<Partition>> preAssignMap = new HashMap<>();
+        preAssignMap.put(rs1.getReplicaSetID(), Lists.newArrayList(p1, p2));
+        preAssignMap.put(rs2.getReplicaSetID(), Lists.newArrayList(p3, p4));
+        preAssignMap.put(rs3.getReplicaSetID(), Lists.newArrayList(p5, p6));
+        CubeAssignment originalAssignment = new CubeAssignment(cubeName, 
preAssignMap);
+        metadataStore.saveNewCubeAssignment(originalAssignment);
+
+        // add remote checkpoint for segment1
+        // check StreamingServer#sendSegmentsToFullBuild
+        KafkaPositionHandler kafkaPositionHandler = new KafkaPositionHandler();
+        Map<Integer, Long> positionMap = new HashMap<>();
+        positionMap.put(p1.getPartitionId(), 10001L);
+        positionMap.put(p2.getPartitionId(), 10002L);
+
+        String positionStr = kafkaPositionHandler.serializePosition(new 
KafkaPosition(positionMap));
+        metadataStore.saveSourceCheckpoint(cubeName, segment1, 
rs1.getReplicaSetID(), positionStr);
+
+        positionMap.clear();
+        positionMap.put(p3.getPartitionId(), 10003L);
+        positionMap.put(p4.getPartitionId(), 10004L);
+        positionStr = kafkaPositionHandler.serializePosition(new 
KafkaPosition(positionMap));
+        metadataStore.saveSourceCheckpoint(cubeName, segment1, 
rs2.getReplicaSetID(), positionStr);
+
+        positionMap.clear();
+        positionMap.put(p5.getPartitionId(), 10005L);
+        positionMap.put(p6.getPartitionId(), 10006L);
+        positionStr = kafkaPositionHandler.serializePosition(new 
KafkaPosition(positionMap));
+        metadataStore.saveSourceCheckpoint(cubeName, segment1, 
rs3.getReplicaSetID(), positionStr);
+
+        positionMap.clear();
+        positionMap.put(p1.getPartitionId(), 20001L);
+        positionMap.put(p2.getPartitionId(), 20002L);
+        positionStr = kafkaPositionHandler.serializePosition(new 
KafkaPosition(positionMap));
+        metadataStore.saveSourceCheckpoint(cubeName, segment2, 
rs1.getReplicaSetID(), positionStr);
+
+        // notify by replica set data has been uploaded
+        metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, 
rs1.getReplicaSetID());
+        metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, 
rs2.getReplicaSetID());
+        metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, 
rs3.getReplicaSetID());
+        metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment2, 
rs1.getReplicaSetID());
+
+        if (addSegmentBuildState) {
+            // update segment build job info
+            SegmentBuildState.BuildState buildState = new 
SegmentBuildState.BuildState();
+            buildState.setBuildStartTime(System.currentTimeMillis() - 30 * 60 
* 1000);// submit 30 minutes ago
+            buildState.setJobId(jobID);
+            buildState.setState(SegmentBuildState.BuildState.State.BUILDING);
+            metadataStore.updateSegmentBuildState(cubeName, segment1, 
buildState);
+        }
+    }
+    // 
================================================================================
+    // ============================= Prepare stub object 
==============================
+
+    CubingJob stubCubingJob(ExecutableState state) {
+        CubingJob job = mock(CubingJob.class);
+        when(job.getStatus()).thenReturn(state);
+        return job;
+    }
+
+    ReceiverClusterManager stubReceiverClusterManager(StreamingCoordinator 
coordinator) {
+        ReceiverClusterManager clusterManager = 
mock(ReceiverClusterManager.class);
+        when(clusterManager.getCoordinator()).thenReturn(coordinator);
+        return clusterManager;
+        // return new ReceiverClusterManager(coordinator);
+    }
+
+    KylinConfig stubKylinConfig() {
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        when(kylinConfig.getMaxBuildingSegments()).thenReturn(10);
+        when(kylinConfig.getStreamingAssigner()).thenReturn("DefaultAssigner");
+
+        // ZK part
+        when(kylinConfig.getZKBaseSleepTimeMs()).thenReturn(5000);
+        when(kylinConfig.getZKMaxRetries()).thenReturn(3);
+        when(kylinConfig.getZookeeperConnectString()).thenReturn(connectStr);
+        return kylinConfig;
+    }
+
+    StreamingCoordinator stubStreamingCoordinator(KylinConfig config, 
CubeManager cubeManager,
+            ExecutableManager executableManager) {
+        StreamingCoordinator coordinator = mock(StreamingCoordinator.class);
+        when(coordinator.getConfig()).thenReturn(config);
+        when(coordinator.getCubeManager()).thenReturn(cubeManager);
+        when(coordinator.getExecutableManager()).thenReturn(executableManager);
+        when(coordinator.getStreamMetadataStore()).thenReturn(metadataStore);
+        return coordinator;
+    }
+
+    ExecutableManager stubExecutableManager(Map<String, CubingJob> 
cubingJobMap) {
+        ExecutableManager executableManager = mock(ExecutableManager.class);
+        for (Map.Entry<String, CubingJob> entry : cubingJobMap.entrySet()) {
+            
when(executableManager.getJob(eq(entry.getKey()))).thenReturn(entry.getValue());
+        }
+        return executableManager;
+    }
+
+    CubeManager stubCubeManager(CubeInstance cubeInstance, boolean 
promotedNewSegmentFailed) {
+        CubeManager cubeManager = mock(CubeManager.class);
+        try {
+            when(cubeManager.getCube(anyString())).thenReturn(cubeInstance);
+            if (promotedNewSegmentFailed) {
+                
doThrow(RuntimeException.class).when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class),
+                        any(CubeSegment.class));
+            } else {
+                
doNothing().when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class),
+                        any(CubeSegment.class));
+            }
+            
doNothing().when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class),
 any(CubeSegment.class));
+        } catch (IOException ioe) {
+            // a ugly workaroud for mock method with declaration of throwing 
checked exception
+        }
+        return cubeManager;
+    }
+
+    CubeInstance stubCubeInstance(CubeSegment cubSegment) {
+        CubeInstance cubeInstance = mock(CubeInstance.class);
+        when(cubeInstance.latestCopyForWrite()).thenReturn(cubeInstance);
+        @SuppressWarnings("unchecked")
+        Segments<CubeSegment> segmentSegments = mock(Segments.class, 
RETURNS_DEEP_STUBS);
+
+        when(segmentSegments.size()).thenReturn(1);
+        when(cubeInstance.getBuildingSegments()).thenReturn(segmentSegments);
+        when(cubeInstance.getName()).thenReturn(cubeName1);
+        when(cubeInstance.getSegment(anyString(), 
Matchers.any())).thenReturn(cubSegment);
+        return cubeInstance;
+    }
+
+    CubeSegment stubCubSegment(SegmentStatusEnum statusEnum, long leftRange, 
long rightRange) {
+        CubeSegment cubeSegment = mock(CubeSegment.class);
+        when(cubeSegment.getTSRange()).thenReturn(new 
SegmentRange.TSRange(leftRange, rightRange));
+        when(cubeSegment.getStatus()).thenReturn(statusEnum);
+        return cubeSegment;
+    }
+}
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 7a16044..b4b4123 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -59,6 +59,7 @@ import 
org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
 import org.apache.kylin.stream.coordinator.StreamingUtils;
 import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
 import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient;
+import 
org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
 import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
 import org.apache.kylin.stream.core.consumer.EndPositionStopCondition;
 import org.apache.kylin.stream.core.consumer.IConsumerProvider;
@@ -202,6 +203,7 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
         }
     }
 
+    @NotAtomicIdempotent
     private void sendSegmentsToFullBuild(String cubeName, 
StreamingSegmentManager segmentManager,
             Collection<StreamingCubeSegment> segments) throws Exception {
         List<Future<?>> futureList = Lists.newArrayList();
@@ -218,7 +220,7 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
         int i = 0;
         for (StreamingCubeSegment segment : segments) {
             futureList.get(i).get();
-            logger.info("save remote store state to metadata store.");
+            logger.info("Save remote store state to metadata store.");
             
streamMetadataStore.addCompleteReplicaSetForSegmentBuild(segment.getCubeName(), 
segment.getSegmentName(),
                     replicaSetID);
 
@@ -228,12 +230,12 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
             streamMetadataStore.saveSourceCheckpoint(segment.getCubeName(), 
segment.getSegmentName(), replicaSetID,
                     smallestSourcePosStr);
 
-            logger.info("send notification to coordinator for cube {} segment 
{}.", cubeName, segment.getSegmentName());
+            logger.info("Send notification to coordinator for cube {} segment 
{}.", cubeName, segment.getSegmentName());
             coordinatorClient.segmentRemoteStoreComplete(currentNode, 
segment.getCubeName(),
                     new Pair<>(segment.getDateRangeStart(), 
segment.getDateRangeEnd()));
-            logger.info("send notification success.");
+            logger.info("Send notification success.");
             segment.saveState(StreamingCubeSegment.State.REMOTE_PERSISTED);
-            logger.info("cube {} segment {}  status converted to {}", 
segment.getCubeName(), segment.getSegmentName(),
+            logger.info("Commit cube {} segment {}  status converted to {}.", 
segment.getCubeName(), segment.getSegmentName(),
                     StreamingCubeSegment.State.REMOTE_PERSISTED.name());
             i++;
         }

Reply via email to