This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a6fc3e7e22bf335f868ad3533aae65b602a9ea94
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Sat Sep 21 16:31:04 2019 +0800

    KYLIN-4167 Clarify overall design for realtime OLAP
---
 .../java/org/apache/kylin/cube/CubeManager.java    |  2 +-
 .../kylin/engine/mr/StreamingCubingEngine.java     |  5 +-
 .../controller/StreamingCoordinatorController.java |  8 ++-
 .../kylin/rest/service/StreamingV2Service.java     | 40 +++++-------
 .../stream/coordinator/StreamMetadataStore.java    | 11 ++--
 .../coordinator/ZookeeperStreamMetadataStore.java  | 66 +++++++++----------
 .../coordinator/client/CoordinatorClient.java      | 15 +++++
 .../stream/core/client/ReceiverAdminClient.java    | 76 ++++++++++++++++++++++
 .../core/consumer/StreamingConsumerChannel.java    | 12 ++++
 .../kylin/stream/core/model/SegmentBuildState.java |  2 +-
 .../stream/core/model/stats/ClusterState.java      |  8 +++
 .../stream/core/model/stats/ReplicaSetState.java   |  3 +
 .../storage/columnar/ColumnarSegmentStore.java     |  2 +-
 .../core/storage/columnar/ColumnarStoreCache.java  | 16 ++++-
 .../kylin/stream/server/StreamingServer.java       |  2 -
 .../server/rest/controller/AdminController.java    | 10 ++-
 16 files changed, 203 insertions(+), 75 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 7ad4dd3..3650913 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -958,7 +958,7 @@ public class CubeManager implements IRealizationProvider {
                 throw new IllegalStateException(String.format(Locale.ROOT,
                         "For cube %s, segment %s missing LastBuildJobID", 
cubeCopy.toString(), newSegCopy.toString()));
 
-            if (isReady(newSegCopy) == true) {
+            if (isReady(newSegCopy)) {
                 logger.warn("For cube {}, segment {} state should be NEW but 
is READY", cubeCopy, newSegCopy);
             }
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java
index ab00344..3dd7859 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java
@@ -26,7 +26,10 @@ import 
org.apache.kylin.job.execution.DefaultChainedExecutable;
  */
 public class StreamingCubingEngine {
 
-    public DefaultChainedExecutable createStreamingCubingBuilder(CubeSegment 
seg, String submitter) {
+    /**
+     * A factory which used to create building job which input data were 
columnar storage cache upload be receiver
+     */
+    public DefaultChainedExecutable createStreamingCubingJob(CubeSegment seg, 
String submitter) {
         return new StreamingCubingJobBuilder(seg, submitter).build();
     }
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
index afb6a43..e1f737e 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
@@ -51,8 +51,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
- * StreamingCoordinatorController is defined as Restful API entrance for 
stream coordinator.
- *
+ * <pre>
+ * When current process is not the coordinator leader(such as coordinator 
follower or non coordinator),
+ *  calling admin operation will lead NotLeadCoordinatorException be thrown.
+ * So this class should only be called by streaming receiver (because they 
know who is the real coordinator leader),
+ *  not kylin user.
+ * </pre>
  */
 @Controller
 @RequestMapping(value = "/streaming_coordinator")
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 53bf266..dc6f8d3 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -34,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -76,9 +74,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+/**
+ * StreamingCoordinatorService will try to forward request to corrdinator 
leader by HttpClient.
+ */
 @Component("streamingServiceV2")
 public class StreamingV2Service extends BasicService {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingV2Service.class);
+    private static final String CLUSTER_STATE = "cluster_state";
 
     private StreamMetadataStore streamMetadataStore;
 
@@ -88,7 +90,7 @@ public class StreamingV2Service extends BasicService {
             .expireAfterWrite(10, TimeUnit.SECONDS).build();
 
     private ExecutorService clusterStateExecutor = new ThreadPoolExecutor(0, 
20, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("fetch_receiver_state"));
+            new LinkedBlockingQueue<>(), new 
NamedThreadFactory("fetch_receiver_state"));
 
     public StreamingV2Service() {
         streamMetadataStore = 
StreamMetadataStoreFactory.getStreamMetaDataStore();
@@ -246,13 +248,6 @@ public class StreamingV2Service extends BasicService {
         getCoordinatorClient().resumeConsumers(cube.getName());
     }
 
-    public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> 
segment, Node receiver) {
-        logger.info(
-                "segment remote store complete signal received for cube:{}, 
segment:{}, try to find proper segment to build",
-                cubeName, segment);
-        getCoordinatorClient().segmentRemoteStoreComplete(receiver, cubeName, 
segment);
-    }
-
     public StreamingSourceConfigManager getStreamingManagerV2() {
         return StreamingSourceConfigManager.getInstance(getConfig());
     }
@@ -284,25 +279,25 @@ public class StreamingV2Service extends BasicService {
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void createReplicaSet(ReplicaSet rs) {
         getCoordinatorClient().createReplicaSet(rs);
-        clusterStateCache.invalidate("cluster_state");
+        clusterStateCache.invalidate(CLUSTER_STATE);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void removeReplicaSet(int rsID) {
         getCoordinatorClient().removeReplicaSet(rsID);
-        clusterStateCache.invalidate("cluster_state");
+        clusterStateCache.invalidate(CLUSTER_STATE);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) {
         getCoordinatorClient().addNodeToReplicaSet(replicaSetID, nodeID);
-        clusterStateCache.invalidate("cluster_state");
+        clusterStateCache.invalidate(CLUSTER_STATE);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
         getCoordinatorClient().removeNodeFromReplicaSet(replicaSetID, nodeID);
-        clusterStateCache.invalidate("cluster_state");
+        clusterStateCache.invalidate(CLUSTER_STATE);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
@@ -362,8 +357,12 @@ public class StreamingV2Service extends BasicService {
         return result;
     }
 
+    /**
+     * Fetch and calculate total cluster state.
+     * @return
+     */
     public ClusterState getClusterState() {
-        ClusterState clusterState = 
clusterStateCache.getIfPresent("cluster_state");
+        ClusterState clusterState = 
clusterStateCache.getIfPresent(CLUSTER_STATE);
         if (clusterState != null) {
             return clusterState;
         }
@@ -373,12 +372,7 @@ public class StreamingV2Service extends BasicService {
 
         Map<Node, Future<ReceiverStats>> statsFuturesMap = Maps.newHashMap();
         for (final Node receiver : allReceivers) {
-            Future<ReceiverStats> receiverStatsFuture = 
clusterStateExecutor.submit(new Callable<ReceiverStats>() {
-                @Override
-                public ReceiverStats call() throws Exception {
-                    return receiverAdminClient.getReceiverStats(receiver);
-                }
-            });
+            Future<ReceiverStats> receiverStatsFuture = 
clusterStateExecutor.submit(() -> 
receiverAdminClient.getReceiverStats(receiver));
             statsFuturesMap.put(receiver, receiverStatsFuture);
         }
 
@@ -435,9 +429,7 @@ public class StreamingV2Service extends BasicService {
                 String cubeName = cubeStatsEntry.getKey();
                 ReceiverCubeStats cubeStats = cubeStatsEntry.getValue();
                 Long latestEventTime = cubeLatestEventMap.get(cubeName);
-                if (latestEventTime != null && latestEventTime < 
cubeStats.getLatestEventTime()) {
-                    cubeLatestEventMap.put(cubeName, 
cubeStats.getLatestEventTime());
-                } else if (latestEventTime == null) {
+                if (latestEventTime == null || latestEventTime < 
cubeStats.getLatestEventTime()) {
                     cubeLatestEventMap.put(cubeName, 
cubeStats.getLatestEventTime());
                 }
             }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
index 68f0cb2..309bee9 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
@@ -88,11 +88,12 @@ public interface StreamMetadataStore {
     Map<Integer, String> getSourceCheckpoint(String cubeName, String 
segmentName);
 
     /**
-     * add group id to the segment info, indicate that the segment data
-     * has been hand over to the remote store
-     * @param cubeName
-     * @param segmentName
-     * @param rsID
+     * Add replica set id to the segment info, indicate that the segment data 
belong to current replica set
+     * has been hand over to the deep storage.
+     *
+     * This should be a indicator of integrity of uploaded segment data.
+     *
+     * @param rsID id of replica set which has upload data to deep storage
      */
     void addCompleteReplicaSetForSegmentBuild(String cubeName, String 
segmentName, int rsID);
 
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
index 2c3acb2..4431b4c 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
@@ -96,7 +96,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
         try {
             client.delete().forPath(ZKPaths.makePath(cubeRoot, cubeName, 
CUBE_ASSIGNMENT));
         } catch (Exception e) {
-            logger.error("error when remove cube assignment", e);
+            logger.error("Error when remove cube assignment " + cubeName, e);
             throw new StoreException(e);
         }
     }
@@ -116,7 +116,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             }
             return cubeAssignmentList;
         } catch (Exception e) {
-            logger.error("error when get assignments");
+            logger.error("Error when get assignments", e);
             throw new StoreException(e);
         }
     }
@@ -127,7 +127,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             List<CubeAssignment> cubeAssignmentList = getAllCubeAssignments();
             return 
AssignmentUtil.convertCubeAssign2ReplicaSetAssign(cubeAssignmentList);
         } catch (Exception e) {
-            logger.error("error when get assignments");
+            logger.error("Error when get assignments", e);
             throw new StoreException(e);
         }
     }
@@ -138,7 +138,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             Map<Integer, Map<String, List<Partition>>> 
replicaSetAssignmentsMap = getAllReplicaSetAssignments();
             return replicaSetAssignmentsMap.get(replicaSetID);
         } catch (Exception e) {
-            logger.error("error when get assignments");
+            logger.error("Error when get assignment for replica set " + 
replicaSetID, e);
             throw new StoreException(e);
         }
     }
@@ -154,7 +154,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             CubeAssignment assignment = 
CubeAssignment.deserializeCubeAssignment(data);
             return assignment;
         } catch (Exception e) {
-            logger.error("error when get cube assignment");
+            logger.error("Error when get cube assignment for " + cubeName, e);
             throw new StoreException(e);
         }
     }
@@ -169,7 +169,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 result.add(replicaSet);
             }
         } catch (Exception e) {
-            logger.error("error when get replica sets", e);
+            logger.error("Error when get replica sets", e);
             throw new StoreException(e);
         }
         return result;
@@ -187,7 +187,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 }
             });
         } catch (Exception e) {
-            logger.error("error when get replica sets", e);
+            logger.error("Error when get replica sets", e);
             throw new StoreException(e);
         }
     }
@@ -219,7 +219,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             client.create().creatingParentsIfNeeded().forPath(replicaSetPath, 
serializeReplicaSet(rs));
             return newReplicaSetID;
         } catch (Exception e) {
-            logger.error("error when create replicaSet:" + rs);
+            logger.error("Error when create replicaSet " + rs, e);
             throw new StoreException(e);
         }
     }
@@ -231,7 +231,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             client.setData().forPath(ZKPaths.makePath(replicaSetRoot, 
String.valueOf(rs.getReplicaSetID())),
                     replicaSetData);
         } catch (Exception e) {
-            logger.error("error when update replicaSet:" + 
rs.getReplicaSetID());
+            logger.error("error when update replicaSet " + rs, e);
             throw new StoreException(e);
         }
     }
@@ -242,7 +242,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             byte[] nodeData = client.getData().forPath(coordinatorRoot);
             return JsonUtil.readValue(nodeData, Node.class);
         } catch (Exception e) {
-            logger.error("error when get coordinator", e);
+            logger.error("Error when get coordinator leader", e);
             throw new StoreException(e);
         }
     }
@@ -253,7 +253,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             byte[] coordinatorBytes = JsonUtil.writeValueAsBytes(coordinator);
             client.setData().forPath(coordinatorRoot, coordinatorBytes);
         } catch (Exception e) {
-            logger.error("error when set coordinator", e);
+            logger.error("Error when set coordinator leader to " + 
coordinator, e);
             throw new StoreException(e);
         }
     }
@@ -266,11 +266,11 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             if (client.checkExists().forPath(path) == null) {
                 client.create().creatingParentsIfNeeded().forPath(path);
             } else {
-                logger.warn("checkpoint path already existed under path {}", 
path);
+                logger.warn("Checkpoint path already existed under path {}, 
overwrite with new one.", path);
             }
             client.setData().forPath(path, Bytes.toBytes(sourceCheckpoint));
         } catch (Exception e) {
-            logger.error("fail to add replicaSet Id to segment build state", 
e);
+            logger.error("Error when save remote checkpoint for " + cubeName + 
" " + segmentName , e);
             throw new StoreException(e);
         }
     }
@@ -295,7 +295,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             }
             return result;
         } catch (Exception e) {
-            logger.error("fail to add replicaSet Id to segment build state", 
e);
+            logger.error("Error to fetch remote checkpoint for " + cubeName + 
" " + segmentName, e);
             throw new StoreException(e);
         }
     }
@@ -317,7 +317,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
 
             return result;
         } catch (Exception e) {
-            logger.error("error when get replica set:" + rsID);
+            logger.error("Error when get replica set " + rsID, e);
             throw new StoreException(e);
         }
     }
@@ -327,7 +327,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
         try {
             client.delete().forPath(ZKPaths.makePath(replicaSetRoot, 
String.valueOf(rsID)));
         } catch (Exception e) {
-            logger.error("error when remove replica set:" + rsID);
+            logger.error("Error when remove replica set " + rsID, e);
             throw new StoreException(e);
         }
     }
@@ -342,7 +342,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 result.add(node);
             }
         } catch (Exception e) {
-            logger.error("error when fetch receivers", e);
+            logger.error("Error when fetch receivers", e);
             throw new StoreException(e);
         }
         return result;
@@ -353,7 +353,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
         try {
             return client.getChildren().forPath(cubeRoot);
         } catch (Exception e) {
-            logger.error("error when fetch cubes", e);
+            logger.error("Error when fetch cubes", e);
             throw new StoreException(e);
         }
     }
@@ -366,7 +366,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 client.create().creatingParentsIfNeeded().forPath(path);
             }
         } catch (Exception e) {
-            logger.error("error when add cube", e);
+            logger.error("Error when add cube " + cube, e);
             throw new StoreException(e);
         }
     }
@@ -379,7 +379,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 
client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(cubeRoot, 
cube));
             }
         } catch (Exception e) {
-            logger.error("error when remove cube", e);
+            logger.error("Error when remove cube " + cube, e);
             throw new StoreException(e);
         }
     }
@@ -399,7 +399,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 return StreamingCubeConsumeState.RUNNING;
             }
         } catch (Exception e) {
-            logger.error("error when get streaming cube consume state", e);
+            logger.error("Error when get streaming cube consume state " + 
cube, e);
             throw new StoreException(e);
         }
     }
@@ -414,7 +414,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 client.create().creatingParentsIfNeeded().forPath(path, 
JsonUtil.writeValueAsBytes(state));
             }
         } catch (Exception e) {
-            logger.error("error when save streaming cube consume state", e);
+            logger.error("Error when save streaming cube consume state " + 
cube + " with " + state, e);
             throw new StoreException(e);
         }
     }
@@ -427,7 +427,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 
client.create().creatingParentsIfNeeded().forPath(receiverPath);
             }
         } catch (Exception e) {
-            logger.error("error when add new receiver", e);
+            logger.error("Error when add new receiver " + receiver, e);
             throw new StoreException(e);
         }
     }
@@ -440,14 +440,14 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 
client.delete().deletingChildrenIfNeeded().forPath(receiverPath);
             }
         } catch (Exception e) {
-            logger.error("error when remove receiver:" + receiver, e);
+            logger.error("Error when remove receiver " + receiver, e);
             throw new StoreException(e);
         }
     }
 
     @Override
     public void saveNewCubeAssignment(CubeAssignment newCubeAssignment) {
-        logger.info("try saving new cube assignment for:" + 
newCubeAssignment.getCubeName());
+        logger.info("Try saving new cube assignment for: {}.", 
newCubeAssignment);
         try {
             String path = 
getCubeAssignmentPath(newCubeAssignment.getCubeName());
             if (client.checkExists().forPath(path) == null) {
@@ -457,7 +457,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 client.setData().forPath(path, 
CubeAssignment.serializeCubeAssignment(newCubeAssignment));
             }
         } catch (Exception e) {
-            logger.error("fail to save cube assignment", e);
+            logger.error("Fail to save cube assignment", e);
             throw new StoreException(e);
         }
     }
@@ -466,7 +466,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
         try {
             client.close();
         } catch (Exception e) {
-            logger.error("exception throws when close assignmentManager", e);
+            logger.error("Exception throws when close assignmentManager", e);
         }
     }
 
@@ -481,7 +481,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 logger.warn("ReplicaSet id {} existed under path {}", rsID, 
path);
             }
         } catch (Exception e) {
-            logger.error("fail to add replicaSet Id to segment build state", 
e);
+            logger.error("Fail to add replicaSet Id to segment build state for 
" + segmentName + " " + rsID, e);
             throw new StoreException(e);
         }
     }
@@ -493,7 +493,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             String path = ZKPaths.makePath(cubeRoot, cubeName, 
CUBE_BUILD_STATE, segmentName);
             client.setData().forPath(path, Bytes.toBytes(stateStr));
         } catch (Exception e) {
-            logger.error("fail to update segment build state", e);
+            logger.error("Fail to update segment build state for " + 
segmentName + " to " + state, e);
             throw new StoreException(e);
         }
     }
@@ -513,7 +513,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             }
             return result;
         } catch (Exception e) {
-            logger.error("fail to get segment build states", e);
+            logger.error("Fail to get segment build states " + cubeName, e);
             throw new StoreException(e);
         }
     }
@@ -524,7 +524,7 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
             String cubePath = getCubeBuildStatePath(cubeName);
             return doGetSegmentBuildState(cubePath, segmentName);
         } catch (Exception e) {
-            logger.error("fail to get cube segment remote store state", e);
+            logger.error("Fail to get segment build state for " + cubeName + " 
" +segmentName, e);
             throw new StoreException(e);
         }
     }
@@ -555,11 +555,11 @@ public class ZookeeperStreamMetadataStore implements 
StreamMetadataStore {
                 client.delete().deletingChildrenIfNeeded().forPath(path);
                 return true;
             } else {
-                logger.warn("cube segment deep store state does not exisit!, 
path {} ", path);
+                logger.warn("Cube segment deep store state does not exisit!, 
path {} ", path);
                 return false;
             }
         } catch (Exception e) {
-            logger.error("fail to remove cube segment deep store state", e);
+            logger.error("Fail to remove cube segment deep store state " + 
cubeName + " " + segmentName, e);
             throw new StoreException(e);
         }
     }
diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java
index 14d814d..476abfb 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java
@@ -28,10 +28,25 @@ import org.apache.kylin.stream.core.model.Node;
 import org.apache.kylin.stream.core.source.Partition;
 
 public interface CoordinatorClient {
+
+    // 
================================================================================
+    // ============================= Receiver side 
====================================
+
+    /**
+     * Notified by a receiver that a part of segment data has been uploaded to 
Deep Storage.
+     * Coordinator will try to find any segment which is ready to build into 
HBase and sumbit building job.
+     */
     void segmentRemoteStoreComplete(Node receiverNode, String cubeName, 
Pair<Long, Long> segmentRange);
 
+    /**
+     * Notified by replica set leader that a leader has been changed to it. No 
influence now.
+     */
     void replicaSetLeaderChange(int replicaSetId, Node newLeader);
 
+
+    // 
================================================================================
+    // =========================== Coordinator side 
===================================
+
     Map<Integer, Map<String, List<Partition>>> reBalanceRecommend();
 
     void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan);
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
index e080af7..b040691 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
@@ -32,27 +32,103 @@ import org.apache.kylin.stream.core.model.UnAssignRequest;
 import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
 import org.apache.kylin.stream.core.model.stats.ReceiverStats;
 
+/**
+ * StreamingCoordinator send admin request to speicifc receiver
+ * (received by 
org.apache.kylin.stream.server.rest.controller.AdminController).
+ *
+ */
 public interface ReceiverAdminClient {
+
+    /**
+     * Notify receiver that it has been assign to consumption task with 
AssignRequest#partitions of cube.
+     * If AssignRequest#startConsumers is set to true, receiver has to start 
consumption at once.
+     */
     void assign(Node receiver, AssignRequest assignRequest) throws IOException;
 
+    /**
+     * <pre>
+     * Notify receiver that it has been unassign to specific cube.
+     * Receiver will stop consumption and delete all local segment cache.
+     * </pre>
+     */
     void unAssign(Node receiver, UnAssignRequest unAssignRequest) throws 
IOException;
 
+    /**
+     * <pre>
+     * Ask receiver to start consumption (create IStreamingConnector).
+     *
+     * Start position is decided by StartConsumersRequest#startProtocol:
+     * 1. when StartConsumersRequest#startProtocol is null, cosume from 
checkpoint:
+     *     1. if local checkpoint exists, cosume from local checkpoint
+     *     2. if local checkpoint not exists, cosume from remote 
checkpoint(see StreamMetadata#getSourceCheckpoint)
+     *     3. if both not exists,  start position decided by 
KylinConfig#isStreamingConsumeFromLatestOffsets
+     * 2. when StartConsumersRequest#startProtocol is not null:
+     *     1. when startProtocol.getStartPosition() is vaild, cosume from 
startProtocol.getStartPosition()
+     *     2. when startProtocol.getStartPosition() is not vaild, start 
position decided by StartProtocol#ConsumerStartMode
+     *
+     * See KafkaSource#createStreamingConnector
+     * </pre>
+     */
     void startConsumers(Node receiver, StartConsumersRequest startRequest) 
throws IOException;
 
+    /**
+     * <pre>
+     * Ask receiver to stop consumption (destroy IStreamingConnector) and 
flush data into disk.
+     * If StopConsumersRequest#removeData set to true, all segment data will 
be deleted.
+     * </pre>
+     */
     ConsumerStatsResponse stopConsumers(Node receiver, StopConsumersRequest 
stopRequest) throws IOException;
 
+    /**
+     * Ask receiver to pause consumption (don't destroy IStreamingConnector).
+     */
     ConsumerStatsResponse pauseConsumers(Node receiver, PauseConsumersRequest 
request) throws IOException;
 
+    /**
+     * <pre>
+     * 1. When ResumeConsumerRequest#resumeToPosition is null, just ask 
receiver to resume consumption.
+     * 2. When ResumeConsumerRequest#resumeToPosition is not null, ask 
receiver to resume to that position and stop consumption,
+     *  so it is something like ReceiverAdminClient#stopConsumers. This case 
is used in reAssign action.
+     *  Please check ReceiverClusterManager#syncAndStopConsumersInRs for 
detail.
+     *
+     *  It is a synchronous method.
+     * </pre>
+     */
     ConsumerStatsResponse resumeConsumers(Node receiver, ResumeConsumerRequest 
request) throws IOException;
 
+    /**
+     * Ask receiver to remove all data related to specific cube in receiver 
side.
+     */
     void removeCubeSegment(Node receiver, String cubeName, String segmentName) 
throws IOException;
 
+    /**
+     * Ask receiver to stop consumption and convert all segments to Immutable.
+     */
     void makeCubeImmutable(Node receiver, String cubeName) throws IOException;
 
+    /**
+     * When a segment has been promoted to HBase Ready Segment(historical 
part),
+     *  segment cache in receiver(realtime part) is useless and need to be 
deleted.
+     */
     void segmentBuildComplete(Node receiver, String cubeName, String 
segmentName) throws IOException;
 
+    /**
+     * <pre>
+     * Notify receiver that it has been added into a new replica set, recevier 
will do
+     *  1. add itself the replica set's leader candidate
+     *  2. fetch assignment from Metadata and try start consumption task
+     * </pre>
+     */
     void addToReplicaSet(Node receiver, int replicaSetID) throws IOException;
 
+    /**
+     * <pre>
+     * Notify receiver that it has been removed from replica set, recevier 
will do
+     *  1. remove assigment and remove itself the replica set's leader 
candidate
+     *  2. stop consumption
+     *  3. remove all local segment cache
+     * </pre>
+     */
     void removeFromReplicaSet(Node receiver) throws IOException;
 
     ReceiverStats getReceiverStats(Node receiver) throws IOException;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index 1052f5a..256b519 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -178,6 +178,9 @@ public class StreamingConsumerChannel implements Runnable {
         }
     }
 
+    /**
+     * Called by another thread.
+     */
     public void stop(long timeoutInMs) {
         this.stopped = true;
         waitConsumerStop(timeoutInMs);
@@ -211,6 +214,9 @@ public class StreamingConsumerChannel implements Runnable {
         }
     }
 
+    /**
+     * Called by another thread.
+     */
     public void pause(boolean wait) {
         this.paused = true;
         if (wait) {
@@ -224,6 +230,9 @@ public class StreamingConsumerChannel implements Runnable {
         }
     }
 
+    /**
+     * Called by another thread.
+     */
     public void resumeToStopCondition(IStopConsumptionCondition 
newStopCondition) {
         this.paused = false;
         if (newStopCondition != IStopConsumptionCondition.NEVER_STOP) {
@@ -239,6 +248,9 @@ public class StreamingConsumerChannel implements Runnable {
         }
     }
 
+    /**
+     * Called by another thread.
+     */
     public void resume() {
         this.paused = false;
     }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java
index c217fa1..b28efd9 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java
@@ -138,7 +138,7 @@ public class SegmentBuildState implements 
Comparable<SegmentBuildState> {
         }
 
         public enum State {
-            WAIT, BUILDING, COMPLETE
+            WAIT, BUILDING
         }
     }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java
index 64fe3ad..9fa277f 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java
@@ -32,9 +32,13 @@ public class ClusterState {
     @JsonProperty("rs_states")
     private List<ReplicaSetState> replicaSetStates = Lists.newArrayList();
 
+    /**
+     * Receiver which don't belong to any replica set.
+     */
     @JsonProperty("available_receivers")
     private List<ReceiverState> availableReceivers = Lists.newArrayList();
 
+    @SuppressWarnings("unused")
     public long getLastUpdateTime() {
         return lastUpdateTime;
     }
@@ -43,18 +47,22 @@ public class ClusterState {
         this.lastUpdateTime = lastUpdateTime;
     }
 
+    @SuppressWarnings("unused")
     public List<ReplicaSetState> getReplicaSetStates() {
         return replicaSetStates;
     }
 
+    @SuppressWarnings("unused")
     public void setReplicaSetStates(List<ReplicaSetState> replicaSetStates) {
         this.replicaSetStates = replicaSetStates;
     }
 
+    @SuppressWarnings("unused")
     public List<ReceiverState> getAvailableReceivers() {
         return availableReceivers;
     }
 
+    @SuppressWarnings("unused")
     public void setAvailableReceivers(List<ReceiverState> availableReceivers) {
         this.availableReceivers = availableReceivers;
     }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java
index 0324416..933aeae 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java
@@ -50,10 +50,12 @@ public class ReplicaSetState {
         this.rsID = rsID;
     }
 
+    @SuppressWarnings("unused")
     public List<ReceiverState> getReceiverStates() {
         return receiverStates;
     }
 
+    @SuppressWarnings("unused")
     public void setReceiverStates(List<ReceiverState> receiverStates) {
         this.receiverStates = receiverStates;
     }
@@ -81,6 +83,7 @@ public class ReplicaSetState {
         receiverStates.add(receiverState);
     }
 
+    @SuppressWarnings("unused")
     public ReceiverState getReceiverState(Node receiver) {
         for (ReceiverState receiverState : receiverStates) {
             if (receiverState.getReceiver().equals(receiver)) {
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
index 14a92f7..5982065 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
@@ -66,7 +66,7 @@ public class ColumnarSegmentStore implements 
IStreamingSegmentStore {
     private static ExecutorService fragmentMergeExecutor;
     {
         fragmentMergeExecutor = new ThreadPoolExecutor(0, 10, 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("fragments-merge"));
+                new LinkedBlockingQueue<>(), new 
NamedThreadFactory("fragments-merge"));
     }
 
     private volatile SegmentMemoryStore activeMemoryStore;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java
index 44f40ad..aa5f827 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java
@@ -37,6 +37,13 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Maps;
 
+/**
+ * In streaming receiver side, data was divided into two part, memory store 
and fragment file. As the literal means,
+ *  memory store is located at JVM heap (actually a SortedMap), and fragment 
file usually located at disk.
+ *
+ * Since the size of fragment file is often very large, reducing times of IO 
will improve performance remarkably. So we
+ *  cache fragment file into off-heap memory as much as possible.
+ */
 public class ColumnarStoreCache {
     private static Logger logger = 
LoggerFactory.getLogger(ColumnarStoreCache.class);
     private static ColumnarStoreCache instance = new ColumnarStoreCache();
@@ -50,7 +57,9 @@ public class ColumnarStoreCache {
 
     private ConcurrentMap<DataSegmentFragment, AtomicLong> refCounters = 
Maps.newConcurrentMap();
     public LoadingCache<DataSegmentFragment, FragmentData> fragmentDataCache = 
CacheBuilder.newBuilder()
-            
.initialCapacity(INIT_CACHE_SIZE).concurrencyLevel(8).maximumSize(CACHE_SIZE)
+            .initialCapacity(INIT_CACHE_SIZE)
+            .concurrencyLevel(8)
+            .maximumSize(CACHE_SIZE)
             .expireAfterAccess(6, TimeUnit.HOURS)
             .removalListener(new RemovalListener<DataSegmentFragment, 
FragmentData>() {
                 @Override
@@ -76,7 +85,8 @@ public class ColumnarStoreCache {
                         logger.debug("no ref counter found for fragment: " + 
fragment);
                     }
                 }
-            }).build(new CacheLoader<DataSegmentFragment, FragmentData>() {
+            })
+            .build(new CacheLoader<DataSegmentFragment, FragmentData>() {
                 @Override
                 public FragmentData load(DataSegmentFragment fragment) throws 
Exception {
                     if (currentBufferedSize.get() >= MAX_BUFFERED_SIZE) {
@@ -133,7 +143,7 @@ public class ColumnarStoreCache {
         if (refCounter != null) {
             refCounter.decrementAndGet();
         } else {
-            logger.warn("ref counter not exist for fragment:" + fragment);
+            logger.warn("Ref counter not exist for fragment:{}", fragment);
         }
     }
 
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 95630c6..7a16044 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -238,7 +237,6 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
                     StreamingCubeSegment.State.REMOTE_PERSISTED.name());
             i++;
         }
-
     }
 
     private void purgeSegments(String cubeName, 
Collection<StreamingCubeSegment> segments,
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java
index b781420..80b1170 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java
@@ -38,6 +38,9 @@ import org.springframework.web.bind.annotation.ResponseBody;
 
 import com.google.common.collect.Lists;
 
+/**
+ * @see org.apache.kylin.stream.core.client.ReceiverAdminClient
+ */
 @Controller
 @RequestMapping(value = "/admin")
 public class AdminController extends BasicController {
@@ -138,8 +141,11 @@ public class AdminController extends BasicController {
     }
 
     /**
-     * re submit segment to hadoop
-     * @param cubeName
+     * <pre>
+     * If some receiver failed to upload local segment cache to HDFS 
automatically for some reason,
+     *  coordinator cannot sumbit a building job because data is incomplete.
+     * In this case, kylin admin may use this API to re-upload local segment 
cache.
+     * </pre>
      */
     @RequestMapping(value = "/data/{cubeName}/{segmentName}/reSubmit", method 
= RequestMethod.PUT, produces = { "application/json" })
     @ResponseBody

Reply via email to