This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a6fc3e7e22bf335f868ad3533aae65b602a9ea94 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Sat Sep 21 16:31:04 2019 +0800 KYLIN-4167 Clarify overall design for realtime OLAP --- .../java/org/apache/kylin/cube/CubeManager.java | 2 +- .../kylin/engine/mr/StreamingCubingEngine.java | 5 +- .../controller/StreamingCoordinatorController.java | 8 ++- .../kylin/rest/service/StreamingV2Service.java | 40 +++++------- .../stream/coordinator/StreamMetadataStore.java | 11 ++-- .../coordinator/ZookeeperStreamMetadataStore.java | 66 +++++++++---------- .../coordinator/client/CoordinatorClient.java | 15 +++++ .../stream/core/client/ReceiverAdminClient.java | 76 ++++++++++++++++++++++ .../core/consumer/StreamingConsumerChannel.java | 12 ++++ .../kylin/stream/core/model/SegmentBuildState.java | 2 +- .../stream/core/model/stats/ClusterState.java | 8 +++ .../stream/core/model/stats/ReplicaSetState.java | 3 + .../storage/columnar/ColumnarSegmentStore.java | 2 +- .../core/storage/columnar/ColumnarStoreCache.java | 16 ++++- .../kylin/stream/server/StreamingServer.java | 2 - .../server/rest/controller/AdminController.java | 10 ++- 16 files changed, 203 insertions(+), 75 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 7ad4dd3..3650913 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -958,7 +958,7 @@ public class CubeManager implements IRealizationProvider { throw new IllegalStateException(String.format(Locale.ROOT, "For cube %s, segment %s missing LastBuildJobID", cubeCopy.toString(), newSegCopy.toString())); - if (isReady(newSegCopy) == true) { + if (isReady(newSegCopy)) { logger.warn("For cube {}, segment {} state should be NEW but is READY", cubeCopy, newSegCopy); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java index ab00344..3dd7859 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/StreamingCubingEngine.java @@ -26,7 +26,10 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; */ public class StreamingCubingEngine { - public DefaultChainedExecutable createStreamingCubingBuilder(CubeSegment seg, String submitter) { + /** + * A factory which used to create building job which input data were columnar storage cache upload be receiver + */ + public DefaultChainedExecutable createStreamingCubingJob(CubeSegment seg, String submitter) { return new StreamingCubingJobBuilder(seg, submitter).build(); } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java index afb6a43..e1f737e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java @@ -51,8 +51,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; /** - * StreamingCoordinatorController is defined as Restful API entrance for stream coordinator. - * + * <pre> + * When current process is not the coordinator leader(such as coordinator follower or non coordinator), + * calling admin operation will lead NotLeadCoordinatorException be thrown. + * So this class should only be called by streaming receiver (because they know who is the real coordinator leader), + * not kylin user. + * </pre> */ @Controller @RequestMapping(value = "/streaming_coordinator") diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index 53bf266..dc6f8d3 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -34,7 +33,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.project.ProjectInstance; @@ -76,9 +74,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +/** + * StreamingCoordinatorService will try to forward request to corrdinator leader by HttpClient. + */ @Component("streamingServiceV2") public class StreamingV2Service extends BasicService { private static final Logger logger = LoggerFactory.getLogger(StreamingV2Service.class); + private static final String CLUSTER_STATE = "cluster_state"; private StreamMetadataStore streamMetadataStore; @@ -88,7 +90,7 @@ public class StreamingV2Service extends BasicService { .expireAfterWrite(10, TimeUnit.SECONDS).build(); private ExecutorService clusterStateExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("fetch_receiver_state")); + new LinkedBlockingQueue<>(), new NamedThreadFactory("fetch_receiver_state")); public StreamingV2Service() { streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); @@ -246,13 +248,6 @@ public class StreamingV2Service extends BasicService { getCoordinatorClient().resumeConsumers(cube.getName()); } - public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> segment, Node receiver) { - logger.info( - "segment remote store complete signal received for cube:{}, segment:{}, try to find proper segment to build", - cubeName, segment); - getCoordinatorClient().segmentRemoteStoreComplete(receiver, cubeName, segment); - } - public StreamingSourceConfigManager getStreamingManagerV2() { return StreamingSourceConfigManager.getInstance(getConfig()); } @@ -284,25 +279,25 @@ public class StreamingV2Service extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) public void createReplicaSet(ReplicaSet rs) { getCoordinatorClient().createReplicaSet(rs); - clusterStateCache.invalidate("cluster_state"); + clusterStateCache.invalidate(CLUSTER_STATE); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) public void removeReplicaSet(int rsID) { getCoordinatorClient().removeReplicaSet(rsID); - clusterStateCache.invalidate("cluster_state"); + clusterStateCache.invalidate(CLUSTER_STATE); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) { getCoordinatorClient().addNodeToReplicaSet(replicaSetID, nodeID); - clusterStateCache.invalidate("cluster_state"); + clusterStateCache.invalidate(CLUSTER_STATE); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) { getCoordinatorClient().removeNodeFromReplicaSet(replicaSetID, nodeID); - clusterStateCache.invalidate("cluster_state"); + clusterStateCache.invalidate(CLUSTER_STATE); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) @@ -362,8 +357,12 @@ public class StreamingV2Service extends BasicService { return result; } + /** + * Fetch and calculate total cluster state. + * @return + */ public ClusterState getClusterState() { - ClusterState clusterState = clusterStateCache.getIfPresent("cluster_state"); + ClusterState clusterState = clusterStateCache.getIfPresent(CLUSTER_STATE); if (clusterState != null) { return clusterState; } @@ -373,12 +372,7 @@ public class StreamingV2Service extends BasicService { Map<Node, Future<ReceiverStats>> statsFuturesMap = Maps.newHashMap(); for (final Node receiver : allReceivers) { - Future<ReceiverStats> receiverStatsFuture = clusterStateExecutor.submit(new Callable<ReceiverStats>() { - @Override - public ReceiverStats call() throws Exception { - return receiverAdminClient.getReceiverStats(receiver); - } - }); + Future<ReceiverStats> receiverStatsFuture = clusterStateExecutor.submit(() -> receiverAdminClient.getReceiverStats(receiver)); statsFuturesMap.put(receiver, receiverStatsFuture); } @@ -435,9 +429,7 @@ public class StreamingV2Service extends BasicService { String cubeName = cubeStatsEntry.getKey(); ReceiverCubeStats cubeStats = cubeStatsEntry.getValue(); Long latestEventTime = cubeLatestEventMap.get(cubeName); - if (latestEventTime != null && latestEventTime < cubeStats.getLatestEventTime()) { - cubeLatestEventMap.put(cubeName, cubeStats.getLatestEventTime()); - } else if (latestEventTime == null) { + if (latestEventTime == null || latestEventTime < cubeStats.getLatestEventTime()) { cubeLatestEventMap.put(cubeName, cubeStats.getLatestEventTime()); } } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java index 68f0cb2..309bee9 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java @@ -88,11 +88,12 @@ public interface StreamMetadataStore { Map<Integer, String> getSourceCheckpoint(String cubeName, String segmentName); /** - * add group id to the segment info, indicate that the segment data - * has been hand over to the remote store - * @param cubeName - * @param segmentName - * @param rsID + * Add replica set id to the segment info, indicate that the segment data belong to current replica set + * has been hand over to the deep storage. + * + * This should be a indicator of integrity of uploaded segment data. + * + * @param rsID id of replica set which has upload data to deep storage */ void addCompleteReplicaSetForSegmentBuild(String cubeName, String segmentName, int rsID); diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java index 2c3acb2..4431b4c 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java @@ -96,7 +96,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { try { client.delete().forPath(ZKPaths.makePath(cubeRoot, cubeName, CUBE_ASSIGNMENT)); } catch (Exception e) { - logger.error("error when remove cube assignment", e); + logger.error("Error when remove cube assignment " + cubeName, e); throw new StoreException(e); } } @@ -116,7 +116,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { } return cubeAssignmentList; } catch (Exception e) { - logger.error("error when get assignments"); + logger.error("Error when get assignments", e); throw new StoreException(e); } } @@ -127,7 +127,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { List<CubeAssignment> cubeAssignmentList = getAllCubeAssignments(); return AssignmentUtil.convertCubeAssign2ReplicaSetAssign(cubeAssignmentList); } catch (Exception e) { - logger.error("error when get assignments"); + logger.error("Error when get assignments", e); throw new StoreException(e); } } @@ -138,7 +138,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { Map<Integer, Map<String, List<Partition>>> replicaSetAssignmentsMap = getAllReplicaSetAssignments(); return replicaSetAssignmentsMap.get(replicaSetID); } catch (Exception e) { - logger.error("error when get assignments"); + logger.error("Error when get assignment for replica set " + replicaSetID, e); throw new StoreException(e); } } @@ -154,7 +154,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { CubeAssignment assignment = CubeAssignment.deserializeCubeAssignment(data); return assignment; } catch (Exception e) { - logger.error("error when get cube assignment"); + logger.error("Error when get cube assignment for " + cubeName, e); throw new StoreException(e); } } @@ -169,7 +169,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { result.add(replicaSet); } } catch (Exception e) { - logger.error("error when get replica sets", e); + logger.error("Error when get replica sets", e); throw new StoreException(e); } return result; @@ -187,7 +187,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { } }); } catch (Exception e) { - logger.error("error when get replica sets", e); + logger.error("Error when get replica sets", e); throw new StoreException(e); } } @@ -219,7 +219,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.create().creatingParentsIfNeeded().forPath(replicaSetPath, serializeReplicaSet(rs)); return newReplicaSetID; } catch (Exception e) { - logger.error("error when create replicaSet:" + rs); + logger.error("Error when create replicaSet " + rs, e); throw new StoreException(e); } } @@ -231,7 +231,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.setData().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rs.getReplicaSetID())), replicaSetData); } catch (Exception e) { - logger.error("error when update replicaSet:" + rs.getReplicaSetID()); + logger.error("error when update replicaSet " + rs, e); throw new StoreException(e); } } @@ -242,7 +242,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { byte[] nodeData = client.getData().forPath(coordinatorRoot); return JsonUtil.readValue(nodeData, Node.class); } catch (Exception e) { - logger.error("error when get coordinator", e); + logger.error("Error when get coordinator leader", e); throw new StoreException(e); } } @@ -253,7 +253,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { byte[] coordinatorBytes = JsonUtil.writeValueAsBytes(coordinator); client.setData().forPath(coordinatorRoot, coordinatorBytes); } catch (Exception e) { - logger.error("error when set coordinator", e); + logger.error("Error when set coordinator leader to " + coordinator, e); throw new StoreException(e); } } @@ -266,11 +266,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path); } else { - logger.warn("checkpoint path already existed under path {}", path); + logger.warn("Checkpoint path already existed under path {}, overwrite with new one.", path); } client.setData().forPath(path, Bytes.toBytes(sourceCheckpoint)); } catch (Exception e) { - logger.error("fail to add replicaSet Id to segment build state", e); + logger.error("Error when save remote checkpoint for " + cubeName + " " + segmentName , e); throw new StoreException(e); } } @@ -295,7 +295,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { } return result; } catch (Exception e) { - logger.error("fail to add replicaSet Id to segment build state", e); + logger.error("Error to fetch remote checkpoint for " + cubeName + " " + segmentName, e); throw new StoreException(e); } } @@ -317,7 +317,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { return result; } catch (Exception e) { - logger.error("error when get replica set:" + rsID); + logger.error("Error when get replica set " + rsID, e); throw new StoreException(e); } } @@ -327,7 +327,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { try { client.delete().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rsID))); } catch (Exception e) { - logger.error("error when remove replica set:" + rsID); + logger.error("Error when remove replica set " + rsID, e); throw new StoreException(e); } } @@ -342,7 +342,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { result.add(node); } } catch (Exception e) { - logger.error("error when fetch receivers", e); + logger.error("Error when fetch receivers", e); throw new StoreException(e); } return result; @@ -353,7 +353,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { try { return client.getChildren().forPath(cubeRoot); } catch (Exception e) { - logger.error("error when fetch cubes", e); + logger.error("Error when fetch cubes", e); throw new StoreException(e); } } @@ -366,7 +366,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.create().creatingParentsIfNeeded().forPath(path); } } catch (Exception e) { - logger.error("error when add cube", e); + logger.error("Error when add cube " + cube, e); throw new StoreException(e); } } @@ -379,7 +379,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(cubeRoot, cube)); } } catch (Exception e) { - logger.error("error when remove cube", e); + logger.error("Error when remove cube " + cube, e); throw new StoreException(e); } } @@ -399,7 +399,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { return StreamingCubeConsumeState.RUNNING; } } catch (Exception e) { - logger.error("error when get streaming cube consume state", e); + logger.error("Error when get streaming cube consume state " + cube, e); throw new StoreException(e); } } @@ -414,7 +414,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.create().creatingParentsIfNeeded().forPath(path, JsonUtil.writeValueAsBytes(state)); } } catch (Exception e) { - logger.error("error when save streaming cube consume state", e); + logger.error("Error when save streaming cube consume state " + cube + " with " + state, e); throw new StoreException(e); } } @@ -427,7 +427,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.create().creatingParentsIfNeeded().forPath(receiverPath); } } catch (Exception e) { - logger.error("error when add new receiver", e); + logger.error("Error when add new receiver " + receiver, e); throw new StoreException(e); } } @@ -440,14 +440,14 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.delete().deletingChildrenIfNeeded().forPath(receiverPath); } } catch (Exception e) { - logger.error("error when remove receiver:" + receiver, e); + logger.error("Error when remove receiver " + receiver, e); throw new StoreException(e); } } @Override public void saveNewCubeAssignment(CubeAssignment newCubeAssignment) { - logger.info("try saving new cube assignment for:" + newCubeAssignment.getCubeName()); + logger.info("Try saving new cube assignment for: {}.", newCubeAssignment); try { String path = getCubeAssignmentPath(newCubeAssignment.getCubeName()); if (client.checkExists().forPath(path) == null) { @@ -457,7 +457,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.setData().forPath(path, CubeAssignment.serializeCubeAssignment(newCubeAssignment)); } } catch (Exception e) { - logger.error("fail to save cube assignment", e); + logger.error("Fail to save cube assignment", e); throw new StoreException(e); } } @@ -466,7 +466,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { try { client.close(); } catch (Exception e) { - logger.error("exception throws when close assignmentManager", e); + logger.error("Exception throws when close assignmentManager", e); } } @@ -481,7 +481,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { logger.warn("ReplicaSet id {} existed under path {}", rsID, path); } } catch (Exception e) { - logger.error("fail to add replicaSet Id to segment build state", e); + logger.error("Fail to add replicaSet Id to segment build state for " + segmentName + " " + rsID, e); throw new StoreException(e); } } @@ -493,7 +493,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName); client.setData().forPath(path, Bytes.toBytes(stateStr)); } catch (Exception e) { - logger.error("fail to update segment build state", e); + logger.error("Fail to update segment build state for " + segmentName + " to " + state, e); throw new StoreException(e); } } @@ -513,7 +513,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { } return result; } catch (Exception e) { - logger.error("fail to get segment build states", e); + logger.error("Fail to get segment build states " + cubeName, e); throw new StoreException(e); } } @@ -524,7 +524,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { String cubePath = getCubeBuildStatePath(cubeName); return doGetSegmentBuildState(cubePath, segmentName); } catch (Exception e) { - logger.error("fail to get cube segment remote store state", e); + logger.error("Fail to get segment build state for " + cubeName + " " +segmentName, e); throw new StoreException(e); } } @@ -555,11 +555,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { client.delete().deletingChildrenIfNeeded().forPath(path); return true; } else { - logger.warn("cube segment deep store state does not exisit!, path {} ", path); + logger.warn("Cube segment deep store state does not exisit!, path {} ", path); return false; } } catch (Exception e) { - logger.error("fail to remove cube segment deep store state", e); + logger.error("Fail to remove cube segment deep store state " + cubeName + " " + segmentName, e); throw new StoreException(e); } } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java index 14d814d..476abfb 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClient.java @@ -28,10 +28,25 @@ import org.apache.kylin.stream.core.model.Node; import org.apache.kylin.stream.core.source.Partition; public interface CoordinatorClient { + + // ================================================================================ + // ============================= Receiver side ==================================== + + /** + * Notified by a receiver that a part of segment data has been uploaded to Deep Storage. + * Coordinator will try to find any segment which is ready to build into HBase and sumbit building job. + */ void segmentRemoteStoreComplete(Node receiverNode, String cubeName, Pair<Long, Long> segmentRange); + /** + * Notified by replica set leader that a leader has been changed to it. No influence now. + */ void replicaSetLeaderChange(int replicaSetId, Node newLeader); + + // ================================================================================ + // =========================== Coordinator side =================================== + Map<Integer, Map<String, List<Partition>>> reBalanceRecommend(); void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java index e080af7..b040691 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java @@ -32,27 +32,103 @@ import org.apache.kylin.stream.core.model.UnAssignRequest; import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats; import org.apache.kylin.stream.core.model.stats.ReceiverStats; +/** + * StreamingCoordinator send admin request to speicifc receiver + * (received by org.apache.kylin.stream.server.rest.controller.AdminController). + * + */ public interface ReceiverAdminClient { + + /** + * Notify receiver that it has been assign to consumption task with AssignRequest#partitions of cube. + * If AssignRequest#startConsumers is set to true, receiver has to start consumption at once. + */ void assign(Node receiver, AssignRequest assignRequest) throws IOException; + /** + * <pre> + * Notify receiver that it has been unassign to specific cube. + * Receiver will stop consumption and delete all local segment cache. + * </pre> + */ void unAssign(Node receiver, UnAssignRequest unAssignRequest) throws IOException; + /** + * <pre> + * Ask receiver to start consumption (create IStreamingConnector). + * + * Start position is decided by StartConsumersRequest#startProtocol: + * 1. when StartConsumersRequest#startProtocol is null, cosume from checkpoint: + * 1. if local checkpoint exists, cosume from local checkpoint + * 2. if local checkpoint not exists, cosume from remote checkpoint(see StreamMetadata#getSourceCheckpoint) + * 3. if both not exists, start position decided by KylinConfig#isStreamingConsumeFromLatestOffsets + * 2. when StartConsumersRequest#startProtocol is not null: + * 1. when startProtocol.getStartPosition() is vaild, cosume from startProtocol.getStartPosition() + * 2. when startProtocol.getStartPosition() is not vaild, start position decided by StartProtocol#ConsumerStartMode + * + * See KafkaSource#createStreamingConnector + * </pre> + */ void startConsumers(Node receiver, StartConsumersRequest startRequest) throws IOException; + /** + * <pre> + * Ask receiver to stop consumption (destroy IStreamingConnector) and flush data into disk. + * If StopConsumersRequest#removeData set to true, all segment data will be deleted. + * </pre> + */ ConsumerStatsResponse stopConsumers(Node receiver, StopConsumersRequest stopRequest) throws IOException; + /** + * Ask receiver to pause consumption (don't destroy IStreamingConnector). + */ ConsumerStatsResponse pauseConsumers(Node receiver, PauseConsumersRequest request) throws IOException; + /** + * <pre> + * 1. When ResumeConsumerRequest#resumeToPosition is null, just ask receiver to resume consumption. + * 2. When ResumeConsumerRequest#resumeToPosition is not null, ask receiver to resume to that position and stop consumption, + * so it is something like ReceiverAdminClient#stopConsumers. This case is used in reAssign action. + * Please check ReceiverClusterManager#syncAndStopConsumersInRs for detail. + * + * It is a synchronous method. + * </pre> + */ ConsumerStatsResponse resumeConsumers(Node receiver, ResumeConsumerRequest request) throws IOException; + /** + * Ask receiver to remove all data related to specific cube in receiver side. + */ void removeCubeSegment(Node receiver, String cubeName, String segmentName) throws IOException; + /** + * Ask receiver to stop consumption and convert all segments to Immutable. + */ void makeCubeImmutable(Node receiver, String cubeName) throws IOException; + /** + * When a segment has been promoted to HBase Ready Segment(historical part), + * segment cache in receiver(realtime part) is useless and need to be deleted. + */ void segmentBuildComplete(Node receiver, String cubeName, String segmentName) throws IOException; + /** + * <pre> + * Notify receiver that it has been added into a new replica set, recevier will do + * 1. add itself the replica set's leader candidate + * 2. fetch assignment from Metadata and try start consumption task + * </pre> + */ void addToReplicaSet(Node receiver, int replicaSetID) throws IOException; + /** + * <pre> + * Notify receiver that it has been removed from replica set, recevier will do + * 1. remove assigment and remove itself the replica set's leader candidate + * 2. stop consumption + * 3. remove all local segment cache + * </pre> + */ void removeFromReplicaSet(Node receiver) throws IOException; ReceiverStats getReceiverStats(Node receiver) throws IOException; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java index 1052f5a..256b519 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java @@ -178,6 +178,9 @@ public class StreamingConsumerChannel implements Runnable { } } + /** + * Called by another thread. + */ public void stop(long timeoutInMs) { this.stopped = true; waitConsumerStop(timeoutInMs); @@ -211,6 +214,9 @@ public class StreamingConsumerChannel implements Runnable { } } + /** + * Called by another thread. + */ public void pause(boolean wait) { this.paused = true; if (wait) { @@ -224,6 +230,9 @@ public class StreamingConsumerChannel implements Runnable { } } + /** + * Called by another thread. + */ public void resumeToStopCondition(IStopConsumptionCondition newStopCondition) { this.paused = false; if (newStopCondition != IStopConsumptionCondition.NEVER_STOP) { @@ -239,6 +248,9 @@ public class StreamingConsumerChannel implements Runnable { } } + /** + * Called by another thread. + */ public void resume() { this.paused = false; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java index c217fa1..b28efd9 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/SegmentBuildState.java @@ -138,7 +138,7 @@ public class SegmentBuildState implements Comparable<SegmentBuildState> { } public enum State { - WAIT, BUILDING, COMPLETE + WAIT, BUILDING } } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java index 64fe3ad..9fa277f 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ClusterState.java @@ -32,9 +32,13 @@ public class ClusterState { @JsonProperty("rs_states") private List<ReplicaSetState> replicaSetStates = Lists.newArrayList(); + /** + * Receiver which don't belong to any replica set. + */ @JsonProperty("available_receivers") private List<ReceiverState> availableReceivers = Lists.newArrayList(); + @SuppressWarnings("unused") public long getLastUpdateTime() { return lastUpdateTime; } @@ -43,18 +47,22 @@ public class ClusterState { this.lastUpdateTime = lastUpdateTime; } + @SuppressWarnings("unused") public List<ReplicaSetState> getReplicaSetStates() { return replicaSetStates; } + @SuppressWarnings("unused") public void setReplicaSetStates(List<ReplicaSetState> replicaSetStates) { this.replicaSetStates = replicaSetStates; } + @SuppressWarnings("unused") public List<ReceiverState> getAvailableReceivers() { return availableReceivers; } + @SuppressWarnings("unused") public void setAvailableReceivers(List<ReceiverState> availableReceivers) { this.availableReceivers = availableReceivers; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java index 0324416..933aeae 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ReplicaSetState.java @@ -50,10 +50,12 @@ public class ReplicaSetState { this.rsID = rsID; } + @SuppressWarnings("unused") public List<ReceiverState> getReceiverStates() { return receiverStates; } + @SuppressWarnings("unused") public void setReceiverStates(List<ReceiverState> receiverStates) { this.receiverStates = receiverStates; } @@ -81,6 +83,7 @@ public class ReplicaSetState { receiverStates.add(receiverState); } + @SuppressWarnings("unused") public ReceiverState getReceiverState(Node receiver) { for (ReceiverState receiverState : receiverStates) { if (receiverState.getReceiver().equals(receiver)) { diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java index 14a92f7..5982065 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java @@ -66,7 +66,7 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore { private static ExecutorService fragmentMergeExecutor; { fragmentMergeExecutor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("fragments-merge")); + new LinkedBlockingQueue<>(), new NamedThreadFactory("fragments-merge")); } private volatile SegmentMemoryStore activeMemoryStore; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java index 44f40ad..aa5f827 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreCache.java @@ -37,6 +37,13 @@ import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Maps; +/** + * In streaming receiver side, data was divided into two part, memory store and fragment file. As the literal means, + * memory store is located at JVM heap (actually a SortedMap), and fragment file usually located at disk. + * + * Since the size of fragment file is often very large, reducing times of IO will improve performance remarkably. So we + * cache fragment file into off-heap memory as much as possible. + */ public class ColumnarStoreCache { private static Logger logger = LoggerFactory.getLogger(ColumnarStoreCache.class); private static ColumnarStoreCache instance = new ColumnarStoreCache(); @@ -50,7 +57,9 @@ public class ColumnarStoreCache { private ConcurrentMap<DataSegmentFragment, AtomicLong> refCounters = Maps.newConcurrentMap(); public LoadingCache<DataSegmentFragment, FragmentData> fragmentDataCache = CacheBuilder.newBuilder() - .initialCapacity(INIT_CACHE_SIZE).concurrencyLevel(8).maximumSize(CACHE_SIZE) + .initialCapacity(INIT_CACHE_SIZE) + .concurrencyLevel(8) + .maximumSize(CACHE_SIZE) .expireAfterAccess(6, TimeUnit.HOURS) .removalListener(new RemovalListener<DataSegmentFragment, FragmentData>() { @Override @@ -76,7 +85,8 @@ public class ColumnarStoreCache { logger.debug("no ref counter found for fragment: " + fragment); } } - }).build(new CacheLoader<DataSegmentFragment, FragmentData>() { + }) + .build(new CacheLoader<DataSegmentFragment, FragmentData>() { @Override public FragmentData load(DataSegmentFragment fragment) throws Exception { if (currentBufferedSize.get() >= MAX_BUFFERED_SIZE) { @@ -133,7 +143,7 @@ public class ColumnarStoreCache { if (refCounter != null) { refCounter.decrementAndGet(); } else { - logger.warn("ref counter not exist for fragment:" + fragment); + logger.warn("Ref counter not exist for fragment:{}", fragment); } } diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 95630c6..7a16044 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -33,7 +33,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; @@ -238,7 +237,6 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis StreamingCubeSegment.State.REMOTE_PERSISTED.name()); i++; } - } private void purgeSegments(String cubeName, Collection<StreamingCubeSegment> segments, diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java index b781420..80b1170 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/AdminController.java @@ -38,6 +38,9 @@ import org.springframework.web.bind.annotation.ResponseBody; import com.google.common.collect.Lists; +/** + * @see org.apache.kylin.stream.core.client.ReceiverAdminClient + */ @Controller @RequestMapping(value = "/admin") public class AdminController extends BasicController { @@ -138,8 +141,11 @@ public class AdminController extends BasicController { } /** - * re submit segment to hadoop - * @param cubeName + * <pre> + * If some receiver failed to upload local segment cache to HDFS automatically for some reason, + * coordinator cannot sumbit a building job because data is incomplete. + * In this case, kylin admin may use this API to re-upload local segment cache. + * </pre> */ @RequestMapping(value = "/data/{cubeName}/{segmentName}/reSubmit", method = RequestMethod.PUT, produces = { "application/json" }) @ResponseBody