This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e8ad658fcd43c650a7545c0d6946fdcd114460dc Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Sat Sep 21 17:03:45 2019 +0800 KYLIN-4167 Refactor streaming coordinator Phase1 --- build/conf/kylin-server-log4j.properties | 10 + .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/job/execution/AbstractExecutable.java | 2 +- .../optrule/visitor/TimezoneRewriteVisitor.java | 5 +- .../rest/service/StreamingCoordinatorService.java | 27 +- .../stream/rpc/HttpStreamDataSearchClient.java | 5 +- .../kylin/stream/coordinator/Coordinator.java | 3 +- .../client/CoordinatorClientFactory.java | 17 +- .../coordinator/coordinate/BuildJobSubmitter.java | 459 +++++++++++++++ .../coordinate/ReceiverClusterManager.java | 651 +++++++++++++++++++++ .../coordinate/SegmentJobBuildInfo.java | 73 +++ .../coordinate/StreamingCoordinator.java | 643 ++++++++++++++++++++ .../annotations/NonSideEffect.java} | 43 +- .../annotations/NotAtomicAndNotIdempotent.java | 45 ++ .../annotations/NotAtomicIdempotent.java | 42 ++ .../ClusterDoctor.java} | 27 +- .../ClusterStateChecker.java} | 32 +- .../exception/ClusterStateException.java | 6 +- .../coordinator/exception/CoordinateException.java | 3 + .../exception/NotLeadCoordinatorException.java | 2 + .../coordinator/exception/StoreException.java | 2 + .../src/main/resources/log4j.properties | 18 +- .../coordinate/BuildJobSubmitterTest.java | 194 ++++++ .../coordinator/coordinate/StreamingTestBase.java | 335 +++++++++++ .../kylin/stream/server/StreamingServer.java | 10 +- 25 files changed, 2567 insertions(+), 91 deletions(-) diff --git a/build/conf/kylin-server-log4j.properties b/build/conf/kylin-server-log4j.properties index aba7001..0b3cef9 100644 --- a/build/conf/kylin-server-log4j.properties +++ b/build/conf/kylin-server-log4j.properties @@ -30,3 +30,13 @@ log4j.rootLogger=INFO,file log4j.logger.org.apache.kylin=DEBUG log4j.logger.org.springframework=WARN log4j.logger.org.springframework.security=INFO + +# Realtime OLAP config +log4j.appender.realtime=org.apache.log4j.DailyRollingFileAppender +log4j.appender.realtime.layout=org.apache.log4j.PatternLayout +log4j.appender.realtime.File=${catalina.home}/../logs/kylin_coordinator.log +log4j.appender.realtime.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n +log4j.appender.realtime.Append=true +log4j.appender.realtime.MaxFileSize=268435456 +log4j.appender.realtime.MaxBackupIndex=10 +log4j.logger.org.apache.kylin.stream.*=DEBUG, realtime \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index a0a1d38..6f73024 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2216,6 +2216,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false")); } + public boolean isNewCoordinatorEnabled() { + return Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", "true")); + } + public String getLocalStorageImpl() { return getOptional("kylin.stream.settled.storage", null); } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 51d9bf7..20d62f9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -265,7 +265,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } @Override - public final String getId() { + public String getId() { return this.id; } diff --git a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java index cb75b34..731ee00 100644 --- a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java +++ b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java @@ -41,6 +41,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Locale; public class TimezoneRewriteVisitor extends RexVisitorImpl<RexNode> { public static final Logger logger = LoggerFactory.getLogger(TimezoneRewriteVisitor.class); @@ -69,8 +70,8 @@ public class TimezoneRewriteVisitor extends RexVisitorImpl<RexNode> { // this will affect code gen int minusHours = 0; String afetrModify = LocalDateTime - .parse(toBeModify, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).minusHours(minusHours) - .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + .parse(toBeModify, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT)).minusHours(minusHours) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT)); logger.info("{} -> {}", toBeModify, afetrModify); RexLiteral newliteral = RexLiteral.fromJdbcString(literal.getType(), literal.getTypeName(), afetrModify); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java index b4f223f..7ac7610 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingCoordinatorService.java @@ -22,10 +22,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.stream.coordinator.Coordinator; import org.apache.kylin.stream.coordinator.StreamMetadataStore; import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; +import org.apache.kylin.stream.coordinator.client.CoordinatorClient; +import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator; import org.apache.kylin.stream.core.model.CubeAssignment; import org.apache.kylin.stream.core.model.ReplicaSet; import org.apache.kylin.stream.core.model.Node; @@ -42,12 +45,17 @@ public class StreamingCoordinatorService extends BasicService { private StreamMetadataStore streamMetadataStore; - private Coordinator streamingCoordinator; + private CoordinatorClient streamingCoordinator; - public StreamingCoordinatorService(){ + public StreamingCoordinatorService() { streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); - //TODO coordinator operation should go to the only one lead coordinator - streamingCoordinator = Coordinator.getInstance(); + if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) { + logger.info("Use new version coordinator."); + streamingCoordinator = StreamingCoordinator.getInstance(); + } else { + logger.info("Use old version coordinator."); + streamingCoordinator = Coordinator.getInstance(); + } } public synchronized Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() { @@ -55,7 +63,7 @@ public class StreamingCoordinatorService extends BasicService { } public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan) { - streamingCoordinator.reBalance(reBalancePlan); + streamingCoordinator.reBalance(reBalancePlan); } public void assignCube(String cubeName) { @@ -94,24 +102,23 @@ public class StreamingCoordinatorService extends BasicService { streamingCoordinator.replicaSetLeaderChange(replicaSetID, newLeader); } - public void createReplicaSet(ReplicaSet rs){ + public void createReplicaSet(ReplicaSet rs) { streamingCoordinator.createReplicaSet(rs); } - public void removeReplicaSet(int rsID){ + public void removeReplicaSet(int rsID) { streamingCoordinator.removeReplicaSet(rsID); } - public void addNodeToReplicaSet(Integer replicaSetID, String nodeID){ + public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) { streamingCoordinator.addNodeToReplicaSet(replicaSetID, nodeID); } - public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID){ + public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) { streamingCoordinator.removeNodeFromReplicaSet(replicaSetID, nodeID); } public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> segment, Node receiver) { streamingCoordinator.segmentRemoteStoreComplete(receiver, cubeName, segment); } - } diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java index a189845..bc075ce 100644 --- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java +++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java @@ -128,8 +128,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { public Iterator<ITuple> search(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, ReplicaSet rs, TupleInfo tupleInfo) throws Exception { List<Node> receivers = Lists.newArrayList(rs.getNodes()); - Node leader = rs.getLeader(); - Node queryReceiver = findBestReceiverServeQuery(receivers, leader, cube.getName()); + Node queryReceiver = findBestReceiverServeQuery(receivers, cube.getName()); IOException exception; try { return doSearch(dataRequest, cube, tupleConverter, recordsSerializer, queryReceiver, tupleInfo); @@ -156,7 +155,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { throw exception; } - private Node findBestReceiverServeQuery(List<Node> receivers, Node lead, String cubeName) { + private Node findBestReceiverServeQuery(List<Node> receivers, String cubeName) { // stick to one receiver according to cube name int receiversSize = receivers.size(); int receiverNo = Math.abs(cubeName.hashCode()) % receiversSize; diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java index 7037d13..77f1268 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java @@ -121,6 +121,7 @@ import javax.annotation.Nullable; * request, others process will become standby/candidate, so single point of failure will be eliminated. * </pre> */ +@Deprecated public class Coordinator implements CoordinatorClient { private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); private static final int DEFAULT_PORT = 7070; @@ -1109,7 +1110,7 @@ public class Coordinator implements CoordinatorClient { logger.info("submit streaming segment build, cube:{} segment:{}", cubeName, segmentName); CubeSegment newSeg = getCubeManager().appendSegment(cubeInstance, new TSRange(segmentRange.getFirst(), segmentRange.getSecond())); - DefaultChainedExecutable executable = new StreamingCubingEngine().createStreamingCubingBuilder(newSeg, + DefaultChainedExecutable executable = new StreamingCubingEngine().createStreamingCubingJob(newSeg, "SYSTEM"); getExecutableManager().addJob(executable); CubingJob cubingJob = (CubingJob) executable; diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java index ef6fdbe..6c8fa16 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java @@ -21,8 +21,10 @@ package org.apache.kylin.stream.coordinator.client; import java.net.InetAddress; import java.net.NetworkInterface; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.stream.coordinator.Coordinator; import org.apache.kylin.stream.coordinator.StreamMetadataStore; +import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator; import org.apache.kylin.stream.core.model.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,9 +32,18 @@ import org.slf4j.LoggerFactory; public class CoordinatorClientFactory { private static final Logger logger = LoggerFactory.getLogger(CoordinatorClientFactory.class); + private CoordinatorClientFactory() { + } + public static CoordinatorClient createCoordinatorClient(StreamMetadataStore streamMetadataStore) { if (isCoordinatorCoLocate(streamMetadataStore)) { - return Coordinator.getInstance(); + if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) { + logger.info("Use new version coordinator."); + return StreamingCoordinator.getInstance(); + } else { + logger.info("Use old version coordinator."); + return Coordinator.getInstance(); + } } else { return new HttpCoordinatorClient(streamMetadataStore); } @@ -43,12 +54,12 @@ public class CoordinatorClientFactory { Node coordinatorNode = streamMetadataStore.getCoordinatorNode(); if (coordinatorNode == null) { logger.warn("no coordinator node registered"); - return true; + return false; } InetAddress inetAddress = InetAddress.getByName(coordinatorNode.getHost()); return NetworkInterface.getByInetAddress(inetAddress) != null; } catch (Exception e) { - logger.error("error when "); + logger.error("Error when check network interface.", e); } return true; } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java new file mode 100644 index 0000000..cf3771b --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.StreamingCubingEngine; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.stream.coordinator.StreamingCubeInfo; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent; +import org.apache.kylin.stream.core.model.CubeAssignment; +import org.apache.kylin.stream.core.model.SegmentBuildState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * <pre> + * The main responsibility of BuildJobSubmitter including: + * 1. Try to find candidate segment which ready to submit a build job + * 2. Trace the status of candidate segment's build job and promote segment if it is has met requirements + * + * The candidate segments are those segment what can be saw/perceived by coordinator, + * candidate segment could be divided into following state/queue: + * 1. segment which data are uploaded PARTLY + * 2. segment which data are uploaded completely and WAITING to build + * 3. segment which in BUILDING state, job's state should be one of (NEW/RUNNING/ERROR/DISCARD) + * 4. segment which built succeed and wait to deliver to historical part (and to be deleted in realtime part) + * 5. segment which in historical part(HBase Ready Segment) + * + * By design, segment should transfer to next queue in sequential way(shouldn't jump the queue), do not break this. + * </pre> + */ +public class BuildJobSubmitter implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(BuildJobSubmitter.class); + + private ConcurrentMap<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> segmentBuildJobCheckList = Maps + .newConcurrentMap(); + private Set<String> cubeCheckList = new ConcurrentSkipListSet<>(); + private StreamingCoordinator coordinator; + + private long checkTimes = 0; + + public BuildJobSubmitter(StreamingCoordinator coordinator) { + this.coordinator = coordinator; + } + + void restore() { + logger.info("Restore job submitter"); + List<String> cubes = coordinator.getStreamMetadataStore().getCubes(); + for (String cube : cubes) { + List<SegmentBuildState> segmentBuildStates = coordinator.getStreamMetadataStore() + .getSegmentBuildStates(cube); + Collections.sort(segmentBuildStates); + for (SegmentBuildState segmentBuildState : segmentBuildStates) { + if (segmentBuildState.isInBuilding()) { + SegmentJobBuildInfo jobBuildInfo = new SegmentJobBuildInfo(cube, segmentBuildState.getSegmentName(), + segmentBuildState.getState().getJobId()); + this.addToJobTrackList(jobBuildInfo); + } + } + } + } + + void addToJobTrackList(SegmentJobBuildInfo segmentBuildJob) { + ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = segmentBuildJobCheckList.get(segmentBuildJob.cubeName); + if (buildInfos == null) { + buildInfos = new ConcurrentSkipListSet<>(); + ConcurrentSkipListSet<SegmentJobBuildInfo> previousValue = segmentBuildJobCheckList + .putIfAbsent(segmentBuildJob.cubeName, buildInfos); + if (previousValue != null) { + buildInfos = previousValue; + } + } + buildInfos.add(segmentBuildJob); + } + + void addToCheckList(String cubeName) { + cubeCheckList.add(cubeName); + } + + void clearCheckList(String cubeName) { + cubeCheckList.remove(cubeName); + segmentBuildJobCheckList.remove(cubeName); + } + + @Override + public void run() { + try { + if (coordinator.isLead()) { + doRun(); + } + } catch (Exception e) { + logger.error("Unexpected error", e); + } + } + + void doRun() { + checkTimes++; + logger.info("\n----------------------------------------------------------- {}", checkTimes); + List<SegmentJobBuildInfo> successJobs = traceEarliestSegmentBuildJob(); + + for (SegmentJobBuildInfo successJob : successJobs) { + ConcurrentSkipListSet<SegmentJobBuildInfo> submittedBuildJobs = segmentBuildJobCheckList + .get(successJob.cubeName); + submittedBuildJobs.remove(successJob); + } + + findSegmentReadyToBuild(); + + if (checkTimes % 100 == 1) { + logger.info("Force traverse all cubes periodically."); + for (StreamingCubeInfo cubeInfo : coordinator.getEnableStreamingCubes()) { + List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeInfo.getCubeName()); + for (String segmentName : segmentList) { + submitSegmentBuildJob(cubeInfo.getCubeName(), segmentName); + } + } + } + } + + /** + * <pre> + * Trace the state of build job for the earliest(NOT ALL) segment for each streaming cube, and + * 1. try to promote into Ready HBase Segment if job's state is succeed + * 2. try to resume the build job if job's state is error + * </pre> + * + * @return all succeed building job + */ + @NonSideEffect + List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() { + List<SegmentJobBuildInfo> successJobs = Lists.newArrayList(); + for (ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos : segmentBuildJobCheckList.values()) { + if (buildInfos.isEmpty()) { + continue; + } + + // find the earliest segment build job and try to promote + SegmentJobBuildInfo segmentBuildJob = buildInfos.first(); + logger.debug("Check the cube:{} segment:{} build status.", segmentBuildJob.cubeName, + segmentBuildJob.segmentName); + try { + CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(segmentBuildJob.jobID); + if (cubingJob == null) { + logger.error("Cannot find metadata of current job."); + continue; + } + ExecutableState jobState = cubingJob.getStatus(); + if (ExecutableState.SUCCEED.equals(jobState)) { + CubeManager cubeManager = coordinator.getCubeManager(); + CubeInstance cubeInstance = cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite(); + CubeSegment cubeSegment = cubeInstance.getSegment(segmentBuildJob.segmentName, null); + logger.info("The cube:{} segment:{} is ready to be promoted.", segmentBuildJob.cubeName, + segmentBuildJob.segmentName); + coordinator.getClusterManager().segmentBuildComplete(cubingJob, cubeInstance, cubeSegment, + segmentBuildJob); + addToCheckList(cubeInstance.getName()); + successJobs.add(segmentBuildJob); + } else if (ExecutableState.ERROR.equals(jobState)) { + if (segmentBuildJob.retryCnt < 5) { + logger.info("Job:{} is error, resume the job.", segmentBuildJob); + coordinator.getExecutableManager().resumeJob(segmentBuildJob.jobID); + segmentBuildJob.retryCnt++; + } else { + logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it" + + "(to let new building job be sumbitted) .", segmentBuildJob); + } + } else { + logger.debug("Current job state {}", jobState); + } + } catch (Exception e) { + logger.error("Error when check streaming segment job build state:" + segmentBuildJob, e); + } + } + return successJobs; + } + + @NonSideEffect + void findSegmentReadyToBuild() { + Iterator<String> iterator = cubeCheckList.iterator(); + while (iterator.hasNext()) { + String cubeName = iterator.next(); + List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeName); + boolean allSubmited = true; + for (String segmentName : segmentList) { + boolean ok = submitSegmentBuildJob(cubeName, segmentName); + allSubmited = allSubmited && ok; + if (!ok) { + logger.debug("Failed to submit building job."); + } + } + if (allSubmited) { + iterator.remove(); + logger.debug("Removed {} from check list.", cubeName); + } + } + } + + // ========================================================================================== + // ========================================================================================== + + /** + * @return list of segment which could be submitted a segment build job + */ + @NonSideEffect + List<String> checkSegmentBuidJobFromMetadata(String cubeName) { + List<String> result = Lists.newArrayList(); + CubeInstance cubeInstance = coordinator.getCubeManager().getCube(cubeName); + CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment(); + + long minSegmentStart = -1; + if (latestHistoryReadySegment != null) { + minSegmentStart = latestHistoryReadySegment.getTSRange().end.v; + } + int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments(); + + CubeAssignment assignments = coordinator.getStreamMetadataStore().getAssignmentsByCube(cubeName); + Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs(); + + List<SegmentBuildState> segmentStates = coordinator.getStreamMetadataStore().getSegmentBuildStates(cubeName); + int inBuildingSegments = cubeInstance.getBuildingSegments().size(); + int leftQuota = allowMaxBuildingSegments - inBuildingSegments; + + // Sort it so we can iterate segments from eariler one to newer one + Collections.sort(segmentStates); + + for (int i = 0; i < segmentStates.size(); i++) { + + if (leftQuota <= 0) { + logger.info("No left quota to build segments for cube:{}", cubeName); + break; + } + + SegmentBuildState segmentState = segmentStates.get(i); + Pair<Long, Long> segmentRange = CubeSegment.parseSegmentName(segmentState.getSegmentName()); + + // If we have a exist historcial segment, we should not let new realtime segment overwrite it, it is so dangrous, + // we just delete the entry to ignore the segment which should not exist + if (segmentRange.getFirst() < minSegmentStart) { + logger.warn( + "The cube segment state is not correct because it belongs to historcial part, cube:{} segment:{}, clear it.", + cubeName, segmentState.getSegmentName()); + coordinator.getStreamMetadataStore().removeSegmentBuildState(cubeName, segmentState.getSegmentName()); + continue; + } + + // We already have a building job for current segment + if (segmentState.isInBuilding()) { + boolean needRebuild = checkSegmentBuildingJob(segmentState, cubeName, cubeInstance); + if (!needRebuild) + continue; + } else if (segmentState.isInWaiting()) { + // The data has not been uploaded to remote completely, or job is discard + // These two case should be submit a building job, just let go through it + } + + boolean readyToBuild = checkSegmentIsReadyToBuild(segmentStates, i, cubeAssignedReplicaSets); + if (!readyToBuild) { + logger.debug("Segment {} {} is not ready to submit a building job.", cubeName, segmentState); + // use break instead continue here, because we should transfer to next queue in sequential way (no jump the queue) + break; + } else { + result.add(segmentState.getSegmentName()); + leftQuota--; + } + } + if (logger.isDebugEnabled() && result.isEmpty()) { + logger.debug("Candidate {} : {}.", cubeName, String.join(", ", result)); + } + return result; + } + + /** + * Submit a build job for streaming segment + * + * @return true if submit succeed ; else false + */ + @NotAtomicIdempotent + boolean submitSegmentBuildJob(String cubeName, String segmentName) { + logger.info("Try submit streaming segment build job, cube:{} segment:{}", cubeName, segmentName); + CubeInstance cubeInstance = coordinator.getCubeManager().getCube(cubeName); + try { + // Step 1. create a new segment if not exists + CubeSegment newSeg = null; + Pair<Long, Long> segmentRange = CubeSegment.parseSegmentName(segmentName); + boolean segmentExists = false; + for (CubeSegment segment : cubeInstance.getSegments()) { + SegmentRange.TSRange tsRange = segment.getTSRange(); + if (tsRange.start.v.equals(segmentRange.getFirst()) && segmentRange.getSecond().equals(tsRange.end.v)) { + segmentExists = true; + newSeg = segment; + } + } + + if (!segmentExists) { + logger.debug("Create segment for {} {} .", cubeName, segmentName); + newSeg = coordinator.getCubeManager().appendSegment(cubeInstance, + new SegmentRange.TSRange(segmentRange.getFirst(), segmentRange.getSecond())); + } else { + logger.info("Segment {} exists.", segmentName); + } + + // Step 2. create and submit new build job + DefaultChainedExecutable executable = getStreamingCubingJob(newSeg); + coordinator.getExecutableManager().addJob(executable); + String jobId = executable.getId(); + newSeg.setLastBuildJobID(jobId); + + // Step 3. add it to job trigger list + SegmentJobBuildInfo segmentJobBuildInfo = new SegmentJobBuildInfo(cubeName, segmentName, jobId); + addToJobTrackList(segmentJobBuildInfo); + + // Step 4. add job to stream metadata in case of current node dead + SegmentBuildState.BuildState state = new SegmentBuildState.BuildState(); + state.setBuildStartTime(System.currentTimeMillis()); + state.setState(SegmentBuildState.BuildState.State.BUILDING); + state.setJobId(jobId); + logger.debug("Commit building job {} for {} {} .", jobId, cubeName, segmentName); + coordinator.getStreamMetadataStore().updateSegmentBuildState(cubeName, segmentName, state); + return true; + } catch (Exception e) { + logger.error("Streaming job submit fail, cubeName:" + cubeName + " segment:" + segmentName, e); + return false; + } + } + + /** + * Check segment which in building state + * + * @return true if we need to resubmit a new build job, else false + */ + boolean checkSegmentBuildingJob(SegmentBuildState segmentState, String cubeName, CubeInstance cubeInstance) { + String jobId = segmentState.getState().getJobId(); + logger.debug("There is segment in building, cube:{} segment:{} jobId:{}", cubeName, + segmentState.getSegmentName(), jobId); + long buildStartTime = segmentState.getState().getBuildStartTime(); + if (buildStartTime != 0 && jobId != null) { + long buildDuration = System.currentTimeMillis() - buildStartTime; + + // Check build state after 15 minutes + if (buildDuration < 15 * 60 * 1000) { + return false; + } + CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(jobId); + Preconditions.checkNotNull(cubingJob, "CubingJob should not be null."); + ExecutableState jobState = cubingJob.getStatus(); + + // If job is already succeed and HBase segment in ready state, remove the build state + if (ExecutableState.SUCCEED.equals(jobState)) { + CubeSegment cubeSegment = cubeInstance.getSegment(segmentState.getSegmentName(), null); + if (cubeSegment != null && SegmentStatusEnum.READY == cubeSegment.getStatus()) { + logger.info("Job:{} is already succeed, and segment:{} is ready, remove segment build state", jobId, + segmentState.getSegmentName()); + coordinator.getStreamMetadataStore().removeSegmentBuildState(cubeName, + segmentState.getSegmentName()); + } + return false; + } + + // If a job is in error state, just retry it + if (ExecutableState.ERROR.equals(jobState)) { + logger.info("Job:{} is error, resume the job.", jobId); + coordinator.getExecutableManager().resumeJob(jobId); + return false; + } + + // If a job is discard, we will try to resumbit it later. + if (ExecutableState.DISCARDED.equals(jobState)) { + logger.info("Job:{} is discard, resubmit it later.", jobId); + return true; + } else { + logger.info("Job:{} is in running, job state: {}.", jobId, jobState); + } + } else { + logger.info("Unknown state {}", segmentState); + } + return false; + } + + /** + * <pre> + * When all replica sets have uploaded their local segment cache to deep storage, (that is to say all required data is uploaded), we can mark + * this segment as ready to submit a MapReduce job to build into HBase. + * + * Note the special situation, when some replica set didn't upload any data in some segment duration for lack + * of entered kafka event, we still try to check the newer segment duration, if found some newer segment have data + * uploaded for current miss replica set, we marked local segment cache has been uploaded for that replica for current segment. + * This workround will prevent job-submit queue from blocking by no data for some topic partition. + * </pre> + * + * @return true if current segment is ready to submit a build job, else false + */ + boolean checkSegmentIsReadyToBuild(List<SegmentBuildState> allSegmentStates, int checkedSegmentIdx, + Set<Integer> cubeAssignedReplicaSets) { + SegmentBuildState checkedSegmentState = allSegmentStates.get(checkedSegmentIdx); + Set<Integer> notCompleteReplicaSets = Sets + .newHashSet(Sets.difference(cubeAssignedReplicaSets, checkedSegmentState.getCompleteReplicaSets())); + if (notCompleteReplicaSets.isEmpty()) { + return true; + } else { + for (int i = checkedSegmentIdx + 1; i < allSegmentStates.size(); i++) { + SegmentBuildState segmentBuildState = allSegmentStates.get(i); + Set<Integer> completeReplicaSetsForNext = segmentBuildState.getCompleteReplicaSets(); + Iterator<Integer> notCompleteRSItr = notCompleteReplicaSets.iterator(); + while (notCompleteRSItr.hasNext()) { + Integer rsID = notCompleteRSItr.next(); + if (completeReplicaSetsForNext.contains(rsID)) { + logger.info( + "the replica set:{} doesn't have data for segment:{}, but have data for later segment:{}", + rsID, checkedSegmentState.getSegmentName(), segmentBuildState.getSegmentName()); + notCompleteRSItr.remove(); + } + } + } + return notCompleteReplicaSets.isEmpty(); + } + } + + public Set<String> getCubeCheckList() { + return cubeCheckList; + } + + public DefaultChainedExecutable getStreamingCubingJob(CubeSegment segment){ + return new StreamingCubingEngine().createStreamingCubingJob(segment, "SYSTEM"); + } +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java new file mode 100644 index 0000000..81b2e8c --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.stream.coordinator.assign.AssignmentsCache; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent; +import org.apache.kylin.stream.coordinator.exception.ClusterStateException; +import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; +import org.apache.kylin.stream.core.model.AssignRequest; +import org.apache.kylin.stream.core.model.ConsumerStatsResponse; +import org.apache.kylin.stream.core.model.CubeAssignment; +import org.apache.kylin.stream.core.model.Node; +import org.apache.kylin.stream.core.model.PauseConsumersRequest; +import org.apache.kylin.stream.core.model.ReplicaSet; +import org.apache.kylin.stream.core.model.ResumeConsumerRequest; +import org.apache.kylin.stream.core.model.StartConsumersRequest; +import org.apache.kylin.stream.core.model.StopConsumersRequest; +import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats; +import org.apache.kylin.stream.core.source.ISourcePosition; +import org.apache.kylin.stream.core.source.ISourcePositionHandler; +import org.apache.kylin.stream.core.source.IStreamingSource; +import org.apache.kylin.stream.core.source.Partition; +import org.apache.kylin.stream.core.source.StreamingSourceFactory; +import org.apache.kylin.stream.core.util.HDFSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * <pre> + * This class manage operation related to multi streaming receivers. They are often not atomic and maybe idempotent. + * + * In a multi-step transcation, following steps should be thought twice: + * 1. should fail fast or continue when exception thrown. + * 2. should API(remote call) be synchronous or asynchronous + * 3. when transcation failed, will roll back always succeed + * 4. transcation should be idempotent so when it failed, it could be fixed by retry + * </pre> + */ +public class ReceiverClusterManager { + private static final Logger logger = LoggerFactory.getLogger(ReceiverClusterManager.class); + + ReceiverClusterManager(StreamingCoordinator coordinator) { + this.coordinator = coordinator; + } + + public StreamingCoordinator getCoordinator() { + return coordinator; + } + + StreamingCoordinator coordinator; + + // ================================================================================ + // ======================== Rebalance related operation =========================== + + @NotAtomicAndNotIdempotent + void doReBalance(List<CubeAssignment> previousAssignments, List<CubeAssignment> newAssignments) { + Map<String, CubeAssignment> previousCubeAssignMap = Maps.newHashMap(); + Map<String, CubeAssignment> newCubeAssignMap = Maps.newHashMap(); + for (CubeAssignment cubeAssignment : previousAssignments) { + previousCubeAssignMap.put(cubeAssignment.getCubeName(), cubeAssignment); + } + for (CubeAssignment cubeAssignment : newAssignments) { + newCubeAssignMap.put(cubeAssignment.getCubeName(), cubeAssignment); + } + try { + Set<String> preCubes = previousCubeAssignMap.keySet(); + Set<String> newCubes = newCubeAssignMap.keySet(); + if (!preCubes.equals(newCubes)) { + logger.error("previous assignment cubes:{}, new assignment cubes:{}", preCubes, newCubes); + throw new IllegalStateException("previous cube assignments"); + } + + MapDifference<String, CubeAssignment> diff = Maps.difference(previousCubeAssignMap, newCubeAssignMap); + Map<String, MapDifference.ValueDifference<CubeAssignment>> changedAssignments = diff.entriesDiffering(); + + for (Map.Entry<String, MapDifference.ValueDifference<CubeAssignment>> changedAssignmentEntry : changedAssignments + .entrySet()) { + String cubeName = changedAssignmentEntry.getKey(); + MapDifference.ValueDifference<CubeAssignment> cubeAssignDiff = changedAssignmentEntry.getValue(); + reassignCubeImpl(cubeName, cubeAssignDiff.leftValue(), cubeAssignDiff.rightValue()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @NotAtomicAndNotIdempotent + void reassignCubeImpl(String cubeName, CubeAssignment preAssignments, CubeAssignment newAssignments) { + logger.info("start cube reBalance, cube:{}, previous assignments:{}, new assignments:{}", cubeName, + preAssignments, newAssignments); + if (newAssignments.equals(preAssignments)) { + logger.info("the new assignment is the same as the previous assignment, do nothing for this reassignment"); + return; + } + CubeInstance cubeInstance = getCoordinator().getCubeManager().getCube(cubeName); + doReassignWithoutCommit(cubeInstance, preAssignments, newAssignments); + + // add empty partitions to the removed replica sets, means that there's still data in the replica set, but no new data will be consumed. + MapDifference<Integer, List<Partition>> assignDiff = Maps.difference(preAssignments.getAssignments(), + newAssignments.getAssignments()); + Map<Integer, List<Partition>> removedAssign = assignDiff.entriesOnlyOnLeft(); + for (Integer removedReplicaSet : removedAssign.keySet()) { + newAssignments.addAssignment(removedReplicaSet, Lists.<Partition> newArrayList()); + } + + logger.info("Commit reassign {} transaction.", cubeName); + getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(newAssignments); + AssignmentsCache.getInstance().clearCubeCache(cubeName); + } + + /** + * <pre> + * Reassign action is a process which move some consumption task from some replica set to new replica set. + * + * It is necessary in some case such as : + * - new topic partition was added + * - wordload not balance between different replica set + * - some nodes have to be offlined so the consumption task have be transfered + * </pre> + * + * @param preAssignments current assignment + * @param newAssignments the assignment we want to assign + */ + @NotAtomicAndNotIdempotent + void doReassignWithoutCommit(CubeInstance cubeInstance, CubeAssignment preAssignments, + CubeAssignment newAssignments) { + String cubeName = preAssignments.getCubeName(); + + // Step 0. Prepare and check + IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance); + MapDifference<Integer, List<Partition>> assignDiff = Maps.difference(preAssignments.getAssignments(), + newAssignments.getAssignments()); + Map<Integer, List<Partition>> sameAssign = assignDiff.entriesInCommon(); + Map<Integer, List<Partition>> newAssign = assignDiff.entriesOnlyOnRight(); + Map<Integer, List<Partition>> removedAssign = assignDiff.entriesOnlyOnLeft(); + + List<ISourcePosition> allPositions = Lists.newArrayList(); + List<ReplicaSet> successSyncReplicaSet = Lists.newArrayList(); + ISourcePosition consumePosition; + + // TODO check health state of related receivers before real action will reduce chance of facing inconsistent state + + // Step 1. Stop and sync all replica set in preAssignment + try { + for (Map.Entry<Integer, List<Partition>> assignmentEntry : preAssignments.getAssignments().entrySet()) { + Integer replicaSetID = assignmentEntry.getKey(); + if (sameAssign.containsKey(replicaSetID)) { + logger.info("the assignment is not changed for cube:{}, replicaSet:{}", cubeName, replicaSetID); + continue; + } + ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID); + ISourcePosition position = syncAndStopConsumersInRs(streamingSource, cubeName, rs); + allPositions.add(position); + successSyncReplicaSet.add(rs); + } + consumePosition = streamingSource.getSourcePositionHandler().mergePositions(allPositions, + ISourcePositionHandler.MergeStrategy.KEEP_LARGE); + logger.info("the consumer position for cube:{} is:{}", cubeName, consumePosition); + } catch (Exception e) { + logger.error("fail to sync assign replicaSet for cube:" + cubeName, e); + Set<Integer> needRollback = successSyncReplicaSet.stream().map(ReplicaSet::getReplicaSetID) + .collect(Collectors.toSet()); + for (ReplicaSet rs : successSyncReplicaSet) { + StartConsumersRequest request = new StartConsumersRequest(); + request.setCube(cubeName); + try { + startConsumersInReplicaSet(rs, request); + needRollback.remove(rs.getReplicaSetID()); + } catch (IOException e1) { + logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + rs.getReplicaSetID(), + e1); + } + } + if (needRollback.isEmpty()) { + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, + ClusterStateException.TransactionStep.STOP_AND_SNYC, "", e); + } else { + StringBuilder str = new StringBuilder(); + try { + str.append("Fail restart:").append(JsonUtil.writeValueAsString(needRollback)); + } catch (JsonProcessingException jpe) { + logger.error("", jpe); + } + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, + ClusterStateException.TransactionStep.STOP_AND_SNYC, str.toString(), e); + } + } + + List<ReplicaSet> successAssigned = Lists.newArrayList(); + List<ReplicaSet> successStarted = Lists.newArrayList(); + Set<Node> failedConvertToImmutableNodes = new HashSet<>(); + + try { + // Step 2. Assign consumption task to newAssignment without start consumption + for (Map.Entry<Integer, List<Partition>> cubeAssignmentEntry : newAssignments.getAssignments().entrySet()) { + Integer replicaSetID = cubeAssignmentEntry.getKey(); + if (sameAssign.containsKey(replicaSetID)) { + continue; + } + + ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID); + logger.info("assign cube:{} to replicaSet:{}", cubeName, replicaSetID); + assignCubeToReplicaSet(rs, cubeName, cubeAssignmentEntry.getValue(), false, true); + successAssigned.add(rs); + } + + // Step 3. Start consumption task to newAssignment + for (Map.Entry<Integer, List<Partition>> cubeAssignmentEntry : newAssignments.getAssignments().entrySet()) { + Integer replicaSetID = cubeAssignmentEntry.getKey(); + if (sameAssign.containsKey(replicaSetID)) { + continue; + } + + ConsumerStartProtocol consumerStartProtocol = new ConsumerStartProtocol( + streamingSource.getSourcePositionHandler().serializePosition(consumePosition.advance())); + + ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID); + StartConsumersRequest startRequest = new StartConsumersRequest(); + startRequest.setCube(cubeName); + startRequest.setStartProtocol(consumerStartProtocol); + logger.info("start consumers for cube:{}, replicaSet:{}, startRequest:{}", cubeName, replicaSetID, + startRequest); + startConsumersInReplicaSet(rs, startRequest); + successStarted.add(rs); + } + + // Step 4. Ask removed replica set to force transfer thier local segment into immutable + for (Map.Entry<Integer, List<Partition>> removeAssignmentEntry : removedAssign.entrySet()) { + Integer replicaSetID = removeAssignmentEntry.getKey(); + logger.info("make cube immutable for cube:{}, replicaSet{}", cubeName, replicaSetID); + ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID); + List<Node> failedNodes = makeCubeImmutableInReplicaSet(rs, cubeName); + failedConvertToImmutableNodes.addAll(failedNodes); + } + if (!failedConvertToImmutableNodes.isEmpty()) { + throw new IOException("Failed to convert to immutable state. "); + } + + logger.info("Finish cube reassign for cube:{} .", cubeName); + } catch (IOException e) { + logger.error("Fail to start consumers for cube:" + cubeName, e); + // roll back success started + Set<Integer> rollbackStarted = successStarted.stream().map(ReplicaSet::getReplicaSetID) + .collect(Collectors.toSet()); + for (ReplicaSet rs : successStarted) { + try { + StopConsumersRequest stopRequest = new StopConsumersRequest(); + stopRequest.setCube(cubeName); + // for new group assignment, need to stop the consumers and remove the cube data + if (newAssign.containsKey(rs.getReplicaSetID())) { + stopRequest.setRemoveData(true); + } + stopConsumersInReplicaSet(rs, stopRequest); + rollbackStarted.remove(rs.getReplicaSetID()); + } catch (IOException e1) { + logger.error("fail to stop consumers for cube:" + cubeName + " replicaSet:" + rs.getReplicaSetID(), + e1); + } + } + + // roll back success assignment + Set<Integer> rollbackAssigned = successAssigned.stream().map(ReplicaSet::getReplicaSetID) + .collect(Collectors.toSet()); + for (ReplicaSet rs : successAssigned) { + try { + List<Partition> partitions = preAssignments.getPartitionsByReplicaSetID(rs.getReplicaSetID()); + assignCubeToReplicaSet(rs, cubeName, partitions, true, true); + rollbackAssigned.remove(rs.getReplicaSetID()); + } catch (IOException e1) { + logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + rs.getReplicaSetID(), + e1); + } + } + + Set<Node> failedReceiver = new HashSet<>(failedConvertToImmutableNodes); + for (Node node : failedConvertToImmutableNodes) { + try { + getCoordinator().makeCubeImmutableForReceiver(node, cubeName); + failedReceiver.remove(node); + } catch (IOException ioe) { + logger.error("fail to make cube immutable for cube:" + cubeName + " to " + node, ioe); + } + } + + // summary after reassign action + StringBuilder str = new StringBuilder(); + try { + str.append("FailStarted:").append(JsonUtil.writeValueAsString(rollbackStarted)).append(";"); + str.append("FailAssigned:").append(JsonUtil.writeValueAsString(rollbackAssigned)).append(";"); + str.append("FailRemotedPresisted:").append(JsonUtil.writeValueAsString(failedReceiver)); + } catch (JsonProcessingException jpe) { + logger.error("", jpe); + } + + String failedInfo = str.toString(); + + if (!rollbackStarted.isEmpty()) { + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, + ClusterStateException.TransactionStep.START_NEW, failedInfo, e); + } else if (!rollbackAssigned.isEmpty()) { + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, + ClusterStateException.TransactionStep.ASSIGN_NEW, failedInfo, e); + } else if (!failedReceiver.isEmpty()) { + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, + ClusterStateException.TransactionStep.MAKE_IMMUTABLE, failedInfo, e); + } else { + throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, + ClusterStateException.TransactionStep.ASSIGN_NEW, failedInfo, e); + } + } + + } + + // ========================================================================================= + // ======================= Operation related to single replica set ======================= + + /** + * Sync/align consumers in the replica set, ensure that all consumers in the group consume to the same position. + * + * @return the consume position which all receivers have aligned. + */ + @NotAtomicAndNotIdempotent + ISourcePosition syncAndStopConsumersInRs(IStreamingSource streamingSource, String cubeName, ReplicaSet replicaSet) + throws IOException { + if (replicaSet.getNodes().size() > 1) { // when group nodes more than 1, force to sync the group + logger.info("sync consume for cube:{}, replicaSet:{}", cubeName, replicaSet.getReplicaSetID()); + + PauseConsumersRequest suspendRequest = new PauseConsumersRequest(); + suspendRequest.setCube(cubeName); + List<ConsumerStatsResponse> allReceiverConsumeState = pauseConsumersInReplicaSet(replicaSet, + suspendRequest); + + List<ISourcePosition> consumePositionList = Lists.transform(allReceiverConsumeState, + new Function<ConsumerStatsResponse, ISourcePosition>() { + @Nullable + @Override + public ISourcePosition apply(@Nullable ConsumerStatsResponse input) { + return streamingSource.getSourcePositionHandler().parsePosition(input.getConsumePosition()); + } + }); + ISourcePosition consumePosition = streamingSource.getSourcePositionHandler() + .mergePositions(consumePositionList, ISourcePositionHandler.MergeStrategy.KEEP_LARGE); + ResumeConsumerRequest resumeRequest = new ResumeConsumerRequest(); + resumeRequest.setCube(cubeName); + resumeRequest + .setResumeToPosition(streamingSource.getSourcePositionHandler().serializePosition(consumePosition)); + + // assume that the resume will always succeed when the replica set can be paused successfully + resumeConsumersInReplicaSet(replicaSet, resumeRequest); + return consumePosition; + } else if (replicaSet.getNodes().size() == 1) { + Node receiver = replicaSet.getNodes().iterator().next(); + StopConsumersRequest request = new StopConsumersRequest(); + request.setCube(cubeName); + logger.info("stop consumers for cube:{}, receiver:{}", cubeName, receiver); + List<ConsumerStatsResponse> stopResponse = stopConsumersInReplicaSet(replicaSet, request); + return streamingSource.getSourcePositionHandler().parsePosition(stopResponse.get(0).getConsumePosition()); + } else { + return null; + } + } + + @NotAtomicAndNotIdempotent + void startConsumersInReplicaSet(ReplicaSet rs, final StartConsumersRequest request) throws IOException { + for (final Node node : rs.getNodes()) { + getCoordinator().startConsumersForReceiver(node, request); + } + } + + @NotAtomicAndNotIdempotent + List<Node> makeCubeImmutableInReplicaSet(ReplicaSet rs, String cubeName) { + List<Node> failedNodes = new ArrayList<>(); + for (final Node node : rs.getNodes()) { + try { + getCoordinator().makeCubeImmutableForReceiver(node, cubeName); + } catch (IOException ioe) { + logger.error(String.format(Locale.ROOT, "Convert %s to immutable for node %s failed.", cubeName, + node.toNormalizeString()), ioe); + failedNodes.add(node); + } + } + return failedNodes; + } + + @NotAtomicAndNotIdempotent + List<ConsumerStatsResponse> resumeConsumersInReplicaSet(ReplicaSet rs, final ResumeConsumerRequest request) + throws IOException { + List<ConsumerStatsResponse> consumerStats = Lists.newArrayList(); + for (final Node node : rs.getNodes()) { + consumerStats.add(getCoordinator().resumeConsumersForReceiver(node, request)); + } + return consumerStats; + } + + @NotAtomicIdempotent + List<ConsumerStatsResponse> stopConsumersInReplicaSet(ReplicaSet rs, final StopConsumersRequest request) + throws IOException { + List<ConsumerStatsResponse> consumerStats = Lists.newArrayList(); + for (final Node node : rs.getNodes()) { + consumerStats.add(getCoordinator().stopConsumersForReceiver(node, request)); + } + return consumerStats; + } + + @NotAtomicIdempotent + List<ConsumerStatsResponse> pauseConsumersInReplicaSet(ReplicaSet rs, final PauseConsumersRequest request) + throws IOException { + List<ConsumerStatsResponse> consumerStats = Lists.newArrayList(); + List<Node> successReceivers = Lists.newArrayList(); + try { + for (final Node node : rs.getNodes()) { + consumerStats.add(getCoordinator().pauseConsumersForReceiver(node, request)); + successReceivers.add(node); + } + } catch (IOException ioe) { + logger.info("Roll back pause consumers for receivers: {}", successReceivers); + ResumeConsumerRequest resumeRequest = new ResumeConsumerRequest(); + resumeRequest.setCube(request.getCube()); + for (Node receiver : successReceivers) { + getCoordinator().resumeConsumersForReceiver(receiver, resumeRequest); + } + throw ioe; + } + return consumerStats; + } + + /** + * Assign consumption task of specific topic partitions to current replica set. + * + * @param partitions specific topic partitions which replica set should consume + * @param startConsumer should receiver start consumption at once + * @param mustAllSucceed if set to true, we should ensure all receivers has been correctly notified; false + * for ensure at least one receivers has been correctly notified + * + * @throws IOException throwed when assign failed + */ + @NotAtomicIdempotent + void assignCubeToReplicaSet(ReplicaSet rs, String cubeName, List<Partition> partitions, boolean startConsumer, + boolean mustAllSucceed) throws IOException { + boolean hasNodeAssigned = false; + IOException exception = null; + AssignRequest assignRequest = new AssignRequest(); + assignRequest.setCubeName(cubeName); + assignRequest.setPartitions(partitions); + assignRequest.setStartConsumers(startConsumer); + for (final Node node : rs.getNodes()) { + try { + getCoordinator().assignToReceiver(node, assignRequest); + hasNodeAssigned = true; + } catch (IOException e) { + if (mustAllSucceed) { + throw e; + } + exception = e; + logger.error("cube:" + cubeName + " consumers start fail for node:" + node.toString(), e); + } + } + if (!hasNodeAssigned) { + if (exception != null) { + throw exception; + } + } + } + + /** + * When a segment build job succeed, we should do some following job to deliver it to historical part. + * + * @return true if promote succeed, else false + */ + @NotAtomicIdempotent + protected boolean segmentBuildComplete(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment, + SegmentJobBuildInfo segmentBuildInfo) throws IOException { + String cubeName = segmentBuildInfo.cubeName; + String segmentName = segmentBuildInfo.segmentName; + + // Step 1. check if the ready to promote into HBase Ready Segment and promote current segment + if (!checkPreviousSegmentReady(cubeSegment)) { + logger.warn("Segment:{}'s previous segment is not ready, will not set the segment to ready.", cubeSegment); + return false; + } + + if (!SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) { + promoteNewSegment(cubingJob, cubeInstance, cubeSegment); + logger.debug("Promote {} succeed.", segmentName); + } + + // Step 2. delete local segment files in receiver side because these are useless now + CubeAssignment assignments = getCoordinator().getStreamMetadataStore().getAssignmentsByCube(cubeName); + for (int replicaSetID : assignments.getReplicaSetIDs()) { + + // Step 2.1 normal case + ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID); + for (Node node : rs.getNodes()) { + try { + getCoordinator().notifyReceiverBuildSuccess(node, cubeName, segmentName); + } catch (IOException e) { + logger.error("error when remove cube segment for receiver:" + node, e); + } + } + + // Step 2.2 specical case + // For the replica set that doesn't have partitions, that should be the "Removed Rs" in latest reassign action. + // We check if any local segment belong to current cube exists, if nothing left, "Removed Rs" will be removed in assignment from StreamMetadata. + if (assignments.getPartitionsByReplicaSetID(replicaSetID).isEmpty()) { + logger.info( + "No partition is assign to the replicaSet:{}, check whether there are local segments on the rs.", + replicaSetID); + Node leader = rs.getLeader(); + try { + ReceiverCubeStats receiverCubeStats = getCoordinator().getReceiverAdminClient() + .getReceiverCubeStats(leader, cubeName); + Set<String> segments = receiverCubeStats.getSegmentStatsMap().keySet(); + if (segments.isEmpty()) { + logger.info("no local segments exist for replicaSet:{}, cube:{}, update assignments.", + replicaSetID, cubeName); + assignments.removeAssignment(replicaSetID); + getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(assignments); + } + } catch (IOException e) { + logger.error("error when get receiver cube stats from:" + leader, e); + } + } + } + + // Step 3. remove entry in StreamMetadata + getCoordinator().getStreamMetadataStore().removeSegmentBuildState(cubeName, segmentName); + + // Step 4. delete colmanear segment cache in HDFS becuase it is needless currently + logger.info("Try to remove the hdfs files for cube:{} segment:{}", cubeName, segmentName); + removeHDFSFiles(cubeName, segmentName); + return true; + } + + /** + * Promote a segment from realtime part into historical part. + */ + void promoteNewSegment(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment) throws IOException { + logger.debug("Try transfer segment's {} state to ready.", cubeSegment.getName()); + long sourceCount = cubingJob.findSourceRecordCount(); + long sourceSizeBytes = cubingJob.findSourceSizeBytes(); + long cubeSizeBytes = cubingJob.findCubeSizeBytes(); + Map<Integer, String> sourceCheckpoint = getCoordinator().getStreamMetadataStore() + .getSourceCheckpoint(cubeInstance.getName(), cubeSegment.getName()); + + ISourcePositionHandler positionOperator = StreamingSourceFactory.getStreamingSource(cubeInstance) + .getSourcePositionHandler(); + Collection<ISourcePosition> sourcePositions = Collections2.transform(sourceCheckpoint.values(), + new Function<String, ISourcePosition>() { + @Nullable + @Override + public ISourcePosition apply(@Nullable String input) { + return positionOperator.parsePosition(input); + } + }); + ISourcePosition sourcePosition = positionOperator.mergePositions(sourcePositions, + ISourcePositionHandler.MergeStrategy.KEEP_SMALL); + cubeSegment.setLastBuildJobID(cubingJob.getId()); + cubeSegment.setLastBuildTime(System.currentTimeMillis()); + cubeSegment.setSizeKB(cubeSizeBytes / 1024); + cubeSegment.setInputRecords(sourceCount); + cubeSegment.setInputRecordsSize(sourceSizeBytes); + cubeSegment.setStreamSourceCheckpoint(positionOperator.serializePosition(sourcePosition)); + getCoordinator().getCubeManager().promoteNewlyBuiltSegments(cubeInstance, cubeSegment); + } + + /** + * <pre> + * We will promote segment to HBase Ready Segment(historical part) in sequential way. So here we check + * if the lastest hbase segment of current cube meet two requirement: + * - In ready state + * - Connect to current segment exactly (no gap or overlap) + * If these two requirement met, we could promote current segment to HBase Ready Segment. + * </pre> + */ + boolean checkPreviousSegmentReady(CubeSegment currSegment) { + long currSegmentStart = currSegment.getTSRange().start.v; + CubeInstance cubeInstance = currSegment.getCubeInstance(); + Segments<CubeSegment> segments = cubeInstance.getSegments(); + long previousSegmentEnd = -1; + for (CubeSegment segment : segments) { + long segmentEnd = segment.getTSRange().end.v; + if (segmentEnd <= currSegmentStart && segmentEnd > previousSegmentEnd) { + previousSegmentEnd = segmentEnd; + } + } + + if (previousSegmentEnd == -1) { + return true; + } + + for (CubeSegment segment : segments) { + long segmentEnd = segment.getTSRange().end.v; + if (segmentEnd == previousSegmentEnd && SegmentStatusEnum.READY.equals(segment.getStatus())) { + return true; + } + } + return false; + } + + private void removeHDFSFiles(String cubeName, String segmentName) { + String segmentHDFSPath = HDFSUtil.getStreamingSegmentFilePath(cubeName, segmentName); + try { + FileSystem fs = HadoopUtil.getFileSystem(segmentHDFSPath); + logger.info("Deleting segment data in HDFS {}", segmentHDFSPath); + fs.delete(new Path(segmentHDFSPath), true); + } catch (Exception e) { + logger.error("error when remove hdfs file, hdfs path:{}", segmentHDFSPath); + } + } +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java new file mode 100644 index 0000000..13f72e6 --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/SegmentJobBuildInfo.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate; + +/** + * A piece of information which record streaming build job + */ +public class SegmentJobBuildInfo implements Comparable<SegmentJobBuildInfo>{ + public final String cubeName; + public final String segmentName; + public final String jobID; + public int retryCnt = 0; + + public SegmentJobBuildInfo(String cubeName, String segmentName, String jobID) { + this.cubeName = cubeName; + this.segmentName = segmentName; + this.jobID = jobID; + } + + @Override + public String toString() { + return "SegmentJobBuildInfo{" + "cubeName='" + cubeName + '\'' + ", segmentName='" + segmentName + '\'' + + ", jobID='" + jobID + '\'' + ", retryCnt=" + retryCnt + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + SegmentJobBuildInfo that = (SegmentJobBuildInfo) o; + + if (cubeName != null ? !cubeName.equals(that.cubeName) : that.cubeName != null) + return false; + if (segmentName != null ? !segmentName.equals(that.segmentName) : that.segmentName != null) + return false; + return jobID != null ? jobID.equals(that.jobID) : that.jobID == null; + + } + + @Override + public int hashCode() { + int result = cubeName != null ? cubeName.hashCode() : 0; + result = 31 * result + (segmentName != null ? segmentName.hashCode() : 0); + result = 31 * result + (jobID != null ? jobID.hashCode() : 0); + return result; + } + + @Override + public int compareTo(SegmentJobBuildInfo o) { + if (!cubeName.equals(o.cubeName)) { + return cubeName.compareTo(o.cubeName); + } + return segmentName.compareTo(o.segmentName); + } +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java new file mode 100644 index 0000000..875a4d7 --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.stream.coordinator.coordinate; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.ServerMode; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.stream.coordinator.StreamMetadataStore; +import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; +import org.apache.kylin.stream.coordinator.StreamingCubeInfo; +import org.apache.kylin.stream.coordinator.StreamingUtils; +import org.apache.kylin.stream.coordinator.assign.Assigner; +import org.apache.kylin.stream.coordinator.assign.AssignmentUtil; +import org.apache.kylin.stream.coordinator.assign.AssignmentsCache; +import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner; +import org.apache.kylin.stream.coordinator.assign.DefaultAssigner; +import org.apache.kylin.stream.coordinator.client.CoordinatorClient; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent; +import org.apache.kylin.stream.coordinator.doctor.ClusterDoctor; +import org.apache.kylin.stream.coordinator.exception.CoordinateException; +import org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException; +import org.apache.kylin.stream.coordinator.exception.StoreException; +import org.apache.kylin.stream.core.client.HttpReceiverAdminClient; +import org.apache.kylin.stream.core.client.ReceiverAdminClient; +import org.apache.kylin.stream.core.model.AssignRequest; +import org.apache.kylin.stream.core.model.ConsumerStatsResponse; +import org.apache.kylin.stream.core.model.CubeAssignment; +import org.apache.kylin.stream.core.model.Node; +import org.apache.kylin.stream.core.model.PauseConsumersRequest; +import org.apache.kylin.stream.core.model.ReplicaSet; +import org.apache.kylin.stream.core.model.ResumeConsumerRequest; +import org.apache.kylin.stream.core.model.StartConsumersRequest; +import org.apache.kylin.stream.core.model.StopConsumersRequest; +import org.apache.kylin.stream.core.model.StreamingCubeConsumeState; +import org.apache.kylin.stream.core.model.UnAssignRequest; +import org.apache.kylin.stream.core.source.IStreamingSource; +import org.apache.kylin.stream.core.source.Partition; +import org.apache.kylin.stream.core.source.StreamingSourceFactory; +import org.apache.kylin.stream.core.source.StreamingTableSourceInfo; +import org.apache.kylin.stream.core.util.HDFSUtil; +import org.apache.kylin.stream.core.util.NamedThreadFactory; +import org.apache.kylin.stream.core.util.NodeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * <pre> + * Each Kylin streaming cluster has at least one coordinator processes, coordinator + * server works as the master node of streaming cluster and handle consume task assignment, + * membership and streaming cube state management. + * + * When cluster have several coordinator processes, only the leader try to answer coordinator client's + * request, other follower will become standby, so single point of failure will be eliminated. + * </pre> + */ +public class StreamingCoordinator implements CoordinatorClient { + private static final Logger logger = LoggerFactory.getLogger(StreamingCoordinator.class); + private static final int DEFAULT_PORT = 7070; + private static StreamingCoordinator instance = null; + + private StreamMetadataStore streamMetadataStore; + private Assigner assigner; + private ReceiverAdminClient receiverAdminClient; + private CuratorFramework zkClient; + private CoordinatorLeaderSelector selector; + private ReceiverClusterManager clusterManager; + private volatile boolean isLead = false; + + private ScheduledExecutorService streamingJobSubmitExecutor; + private ScheduledExecutorService clusterStateCheckExecutor; + private ClusterDoctor clusterDoctor; + private BuildJobSubmitter buildJobSubmitter; + + private StreamingCoordinator() { + this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); + clusterManager = new ReceiverClusterManager(this); + this.receiverAdminClient = new HttpReceiverAdminClient(); + this.assigner = getAssigner(); + this.zkClient = StreamingUtils.getZookeeperClient(); + this.selector = new CoordinatorLeaderSelector(); + this.buildJobSubmitter = new BuildJobSubmitter(this); + this.clusterDoctor = new ClusterDoctor(); + if (ServerMode.SERVER_MODE.canServeStreamingCoordinator()) { + this.streamingJobSubmitExecutor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("streaming_job_submitter")); + this.clusterStateCheckExecutor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("cluster_state_checker")); + start(); + } + } + + public static StreamingCoordinator getInstance() { + if (instance == null) { + synchronized (StreamingCoordinator.class) { + if (instance == null) { + instance = new StreamingCoordinator(); + } + } + } + return instance; + } + + private void start() { + selector.start(); + streamingJobSubmitExecutor.scheduleAtFixedRate(buildJobSubmitter, 0, 2, TimeUnit.MINUTES); + clusterStateCheckExecutor.scheduleAtFixedRate(clusterDoctor, 5, 10, TimeUnit.MINUTES); + } + + + /** + * Assign the streaming cube to replica sets. Replica sets is calculated by Assigner. + * + * @throws CoordinateException when assign action failed + */ + @Override + @NotAtomicIdempotent + public synchronized void assignCube(String cubeName) { + checkLead(); + streamMetadataStore.addStreamingCube(cubeName); + StreamingCubeInfo cube = getStreamCubeInfo(cubeName); + CubeAssignment existAssignment = streamMetadataStore.getAssignmentsByCube(cube.getCubeName()); + if (existAssignment != null) { + logger.warn("Cube {} is already assigned.", cube.getCubeName()); + return; + } + List<ReplicaSet> replicaSets = streamMetadataStore.getReplicaSets(); + if (replicaSets == null || replicaSets.isEmpty()) { + throw new IllegalStateException("No replicaSet is configured in system"); + } + CubeAssignment assignment = assigner.assign(cube, replicaSets, streamMetadataStore.getAllCubeAssignments()); + doAssignCube(cubeName, assignment); + } + + /** + * Unassign action will remove data which not belong to historical part + * and stop consumption for all assigned receivers. + * + * @throws CoordinateException when unAssign action failed + */ + @Override + @NotAtomicIdempotent + public void unAssignCube(String cubeName) { + checkLead(); + CubeAssignment assignment = streamMetadataStore.getAssignmentsByCube(cubeName); + if (assignment == null) { + return; + } + List<Node> unAssignedFailReceivers = Lists.newArrayList(); + try { + logger.info("Send unAssign cube:{} request to receivers", cubeName); + for (Integer replicaSetID : assignment.getReplicaSetIDs()) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(replicaSetID); + UnAssignRequest request = new UnAssignRequest(); + request.setCube(cubeName); + for (Node receiver : rs.getNodes()) { + try { + unAssignToReceiver(receiver, request); + } catch (Exception e) { + logger.error("Exception throws when unAssign receiver", e); + unAssignedFailReceivers.add(receiver); + } + } + } + logger.debug("Remove temp hdfs files for {}", cubeName); + removeCubeHDFSFiles(cubeName); + logger.debug("Clear cube info from job check list"); + buildJobSubmitter.clearCheckList(cubeName); + logger.debug("Commit unassign {} transaction.", cubeName); + streamMetadataStore.removeStreamingCube(cubeName); + AssignmentsCache.getInstance().clearCubeCache(cubeName); + } catch (Exception e) { + throw new CoordinateException(e); + } + if (!unAssignedFailReceivers.isEmpty()) { + String msg = "unAssign fail for receivers:" + String.join(",", + unAssignedFailReceivers.stream().map(Node::toString).collect(Collectors.toList())); + throw new CoordinateException(msg); + } + } + + /** + * change assignment of cubeName to assignments + */ + @Override + @NotAtomicAndNotIdempotent + public synchronized void reAssignCube(String cubeName, CubeAssignment assignments) { + checkLead(); + CubeAssignment preAssignments = streamMetadataStore.getAssignmentsByCube(cubeName); + if (preAssignments == null) { + logger.info("no previous cube assign exists, use the new assignment:{}", assignments); + doAssignCube(cubeName, assignments); + } else { + clusterManager.reassignCubeImpl(cubeName, preAssignments, assignments); + } + } + + @Override + public void segmentRemoteStoreComplete(Node receiver, String cubeName, Pair<Long, Long> segment) { + checkLead(); + logger.info("Segment remote store complete signal received for cube:{}, segment:{}", cubeName, segment); + buildJobSubmitter.addToCheckList(cubeName); + } + + @Override + @NotAtomicIdempotent + public void pauseConsumers(String cubeName) { + checkLead(); + CubeAssignment assignment = streamMetadataStore.getAssignmentsByCube(cubeName); + PauseConsumersRequest request = new PauseConsumersRequest(); + request.setCube(cubeName); + try { + for (Integer rsID : assignment.getReplicaSetIDs()) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID); + clusterManager.pauseConsumersInReplicaSet(rs, request); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + logger.debug("Committing pauseConsumers {} transaction.", cubeName); + streamMetadataStore.saveStreamingCubeConsumeState(cubeName, StreamingCubeConsumeState.PAUSED); + logger.debug("Committed pauseConsumers {} transaction.", cubeName); + + } + + @Override + @NotAtomicIdempotent + public void resumeConsumers(String cubeName) { + checkLead(); + CubeAssignment assignment = streamMetadataStore.getAssignmentsByCube(cubeName); + ResumeConsumerRequest request = new ResumeConsumerRequest(); + request.setCube(cubeName); + try { + for (Integer rsID : assignment.getReplicaSetIDs()) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID); + clusterManager.resumeConsumersInReplicaSet(rs, request); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + logger.debug("Committing resumeConsumers {} transaction.", cubeName); + streamMetadataStore.saveStreamingCubeConsumeState(cubeName, StreamingCubeConsumeState.RUNNING); + logger.debug("Committed resumeConsumers {} transaction.", cubeName); + } + + @Override + public void replicaSetLeaderChange(int replicaSetID, Node newLeader) { + checkLead(); + Map<String, List<Partition>> assignment = streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID); + if (assignment == null || assignment.isEmpty()) { + return; + } + // clear assign cache for this group + for (String cubeName : assignment.keySet()) { + AssignmentsCache.getInstance().clearCubeCache(cubeName); + } + } + + private void doAssignCube(String cubeName, CubeAssignment assignment) { + Set<ReplicaSet> successRS = Sets.newHashSet(); + try { + for (Integer rsID : assignment.getReplicaSetIDs()) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID); + clusterManager.assignCubeToReplicaSet(rs, cubeName, assignment.getPartitionsByReplicaSetID(rsID), true, + false); + successRS.add(rs); + } + logger.debug("Committing assign {} transaction.", cubeName); + streamMetadataStore.saveNewCubeAssignment(assignment); + logger.debug("Committed assign {} transaction.", cubeName); + } catch (Exception e) { + // roll back the success group assignment + for (ReplicaSet rs : successRS) { + + UnAssignRequest request = new UnAssignRequest(); + request.setCube(cubeName); + unAssignFromReplicaSet(rs, request); + } + throw new CoordinateException(e); + } + } + + @Override + public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() { + checkLead(); + return reBalancePlan(getEnableStreamingCubes(), streamMetadataStore.getReplicaSets()); + } + + /** + * reBalance the cube and partitions + * @param newAssignmentsPlan Map<ReplicaSetID, Map<CubeName, List<Partition>> + */ + @Override + @NotAtomicAndNotIdempotent + public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> newAssignmentsPlan) { + checkLead(); + List<CubeAssignment> currCubeAssignments = streamMetadataStore.getAllCubeAssignments(); + List<CubeAssignment> newCubeAssignments = AssignmentUtil.convertReplicaSetAssign2CubeAssign(newAssignmentsPlan); + clusterManager.doReBalance(currCubeAssignments, newCubeAssignments); + } + + Map<Integer, Map<String, List<Partition>>> reBalancePlan(List<StreamingCubeInfo> allCubes, + List<ReplicaSet> allReplicaSets) { + List<CubeAssignment> currCubeAssignments = streamMetadataStore.getAllCubeAssignments(); + return assigner.reBalancePlan(allReplicaSets, allCubes, currCubeAssignments); + } + + public synchronized void createReplicaSet(ReplicaSet rs) { + int replicaSetID = streamMetadataStore.createReplicaSet(rs); + try { + for (Node receiver : rs.getNodes()) { + addReceiverToReplicaSet(receiver, replicaSetID); + } + } catch (IOException e) { + logger.warn("Create replica set failed.", e); + } + } + + public synchronized void removeReplicaSet(int rsID) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID); + if (rs == null) { + return; + } + if (rs.getNodes() != null && !rs.getNodes().isEmpty()) { + throw new CoordinateException("Cannot remove rs, because there are nodes in it."); + } + Map<String, List<Partition>> assignment = streamMetadataStore.getAssignmentsByReplicaSet(rsID); + if (assignment != null && !assignment.isEmpty()) { + throw new CoordinateException("Cannot remove rs, because there are assignments."); + } + streamMetadataStore.removeReplicaSet(rsID); + } + + public synchronized void addNodeToReplicaSet(Integer replicaSetID, String nodeID) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(replicaSetID); + Node receiver = Node.fromNormalizeString(nodeID); + List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets(); + for (ReplicaSet other : allReplicaSet) { + if (other.getReplicaSetID() != replicaSetID) { + if (other.getNodes().contains(receiver)) { + logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID, + replicaSetID, other.getReplicaSetID()); + throw new IllegalStateException("Node exists in ReplicaSet!"); + } + } + } + rs.addNode(receiver); + streamMetadataStore.updateReplicaSet(rs); + try { + Map<String, List<Partition>> assignment = streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID); + if (assignment == null || assignment.isEmpty()) { + return; + } + addReceiverToReplicaSet(receiver, replicaSetID); + // clear assign cache for this group + for (String cubeName : assignment.keySet()) { + AssignmentsCache.getInstance().clearCubeCache(cubeName); + } + } catch (IOException e) { + logger.warn("fail to add receiver to replicaSet ", e); + } + } + + public synchronized void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(replicaSetID); + Node receiver = Node.fromNormalizeString(nodeID); + rs.removeNode(receiver); + streamMetadataStore.updateReplicaSet(rs); + try { + Map<String, List<Partition>> assignment = streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID); + removeReceiverFromReplicaSet(receiver); + // clear assign cache for this group + if (assignment != null) { + for (String cubeName : assignment.keySet()) { + AssignmentsCache.getInstance().clearCubeCache(cubeName); + } + } + } catch (IOException e) { + logger.warn("Remove node from replicaSet failed.", e); + } + } + + @SuppressWarnings("unused") + public void stopConsumers(String cubeName) { + CubeAssignment assignment = streamMetadataStore.getAssignmentsByCube(cubeName); + StopConsumersRequest request = new StopConsumersRequest(); + request.setCube(cubeName); + try { + for (Integer rsID : assignment.getReplicaSetIDs()) { + ReplicaSet rs = streamMetadataStore.getReplicaSet(rsID); + clusterManager.stopConsumersInReplicaSet(rs, request); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void unAssignFromReplicaSet(final ReplicaSet rs, final UnAssignRequest unAssignRequest) { + for (Node receiver : rs.getNodes()) { + try { + unAssignToReceiver(receiver, unAssignRequest); + } catch (IOException e) { + logger.error("Error when roll back assignment", e); + } + } + } + + // ================================================================================ + // ========================== Receiver related operation ========================== + + protected void assignToReceiver(final Node receiver, final AssignRequest request) throws IOException { + receiverAdminClient.assign(receiver, request); + } + + private void unAssignToReceiver(final Node receiver, final UnAssignRequest request) throws IOException { + receiverAdminClient.unAssign(receiver, request); + } + + private void addReceiverToReplicaSet(final Node receiver, final int replicaSetID) throws IOException { + receiverAdminClient.addToReplicaSet(receiver, replicaSetID); + } + + private void removeReceiverFromReplicaSet(final Node receiver) throws IOException { + receiverAdminClient.removeFromReplicaSet(receiver); + } + + protected void startConsumersForReceiver(final Node receiver, final StartConsumersRequest request) + throws IOException { + receiverAdminClient.startConsumers(receiver, request); + } + + protected ConsumerStatsResponse stopConsumersForReceiver(final Node receiver, final StopConsumersRequest request) + throws IOException { + return receiverAdminClient.stopConsumers(receiver, request); + } + + protected ConsumerStatsResponse pauseConsumersForReceiver(final Node receiver, final PauseConsumersRequest request) + throws IOException { + return receiverAdminClient.pauseConsumers(receiver, request); + } + + protected ConsumerStatsResponse resumeConsumersForReceiver(final Node receiver, final ResumeConsumerRequest request) + throws IOException { + return receiverAdminClient.resumeConsumers(receiver, request); + } + + protected void makeCubeImmutableForReceiver(final Node receiver, final String cubeName) throws IOException { + receiverAdminClient.makeCubeImmutable(receiver, cubeName); + } + + void notifyReceiverBuildSuccess(final Node receiver, final String cubeName, final String segmentName) + throws IOException { + receiverAdminClient.segmentBuildComplete(receiver, cubeName, segmentName); + } + + // ================================================================================ + // ============================ Utility method ==================================== + + public ExecutableManager getExecutableManager() { + return ExecutableManager.getInstance(getConfig()); + } + + public CubeManager getCubeManager() { + return CubeManager.getInstance(getConfig()); + } + + public KylinConfig getConfig() { + return KylinConfig.getInstanceFromEnv(); + } + + List<StreamingCubeInfo> getEnableStreamingCubes() { + List<StreamingCubeInfo> allCubes = getStreamingCubes(); + List<StreamingCubeInfo> result = Lists.newArrayList(); + for (StreamingCubeInfo cube : allCubes) { + CubeInstance cubeInstance = getCubeManager().getCube(cube.getCubeName()); + if (cubeInstance.getStatus() == RealizationStatusEnum.READY) { + result.add(cube); + } + } + return result; + } + + List<StreamingCubeInfo> getStreamingCubes() { + List<String> cubes = streamMetadataStore.getCubes(); + List<StreamingCubeInfo> result = Lists.newArrayList(); + for (String cubeName : cubes) { + StreamingCubeInfo cubeInfo = getStreamCubeInfo(cubeName); + if (cubeInfo != null) { + result.add(cubeInfo); + } + } + return result; + } + + public StreamingCubeInfo getStreamCubeInfo(String cubeName) { + CubeInstance cube = getCubeManager().getCube(cubeName); + if (cube == null) { + return null; + } + // count of consumers should be estimated by kylin admin and set in cube level + int numOfConsumerTasks = cube.getConfig().getStreamingCubeConsumerTasksNum(); + IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cube); + StreamingTableSourceInfo tableSourceInfo = streamingSource.load(cubeName); + return new StreamingCubeInfo(cubeName, tableSourceInfo, numOfConsumerTasks); + } + + public void removeCubeHDFSFiles(String cubeName) { + String segmentHDFSPath = HDFSUtil.getStreamingCubeFilePath(cubeName); + try { + FileSystem fs = HadoopUtil.getFileSystem(segmentHDFSPath); + fs.delete(new Path(segmentHDFSPath), true); + } catch (Exception e) { + logger.error("Error when remove hdfs file, hdfs path:{}", segmentHDFSPath); + } + } + + private void checkLead() { + if (!isLead) { + Node coordinatorLeader; + try { + coordinatorLeader = streamMetadataStore.getCoordinatorNode(); + } catch (StoreException store) { + throw new NotLeadCoordinatorException("Lead coordinator can not found.", store); + } + throw new NotLeadCoordinatorException( + "Current coordinator is not lead, please check host " + coordinatorLeader); + } + } + + private Assigner getAssigner() { + String assignerName = getConfig().getStreamingAssigner(); + Assigner oneAssigner; + logger.debug("Using assigner {}", assignerName); + switch (assignerName) { + case "DefaultAssigner": + oneAssigner = new DefaultAssigner(); + break; + case "CubePartitionRoundRobinAssigner": + oneAssigner = new CubePartitionRoundRobinAssigner(); + break; + default: + oneAssigner = new DefaultAssigner(); + } + return oneAssigner; + } + + public ReceiverClusterManager getClusterManager() { + return clusterManager; + } + + public boolean isLead() { + return isLead; + } + + public StreamMetadataStore getStreamMetadataStore() { + return streamMetadataStore; + } + + public ReceiverAdminClient getReceiverAdminClient() { + return receiverAdminClient; + } + + private class CoordinatorLeaderSelector extends LeaderSelectorListenerAdapter implements Closeable { + private LeaderSelector leaderSelector; + + public CoordinatorLeaderSelector() { + String path = StreamingUtils.COORDINATOR_LEAD; + leaderSelector = new LeaderSelector(zkClient, path, this); + leaderSelector.autoRequeue(); + } + + @Override + public void close() { + leaderSelector.close(); + } + + public void start() { + leaderSelector.start(); + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + logger.info("Current node become the lead coordinator."); + streamMetadataStore.setCoordinatorNode(NodeUtil.getCurrentNode(DEFAULT_PORT)); + isLead = true; + // check job status every minute + buildJobSubmitter.restore(); + while (true) { + try { + Thread.sleep(5 * 60 * 1000); + } catch (InterruptedException exception) { + Thread.interrupted(); + break; + } + if (!leaderSelector.hasLeadership()) { + break; + } + } + logger.info("Become the follower coordinator."); + isLead = false; + } + } +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java similarity index 53% copy from stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java copy to stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java index dfb42e8..0d63ac7 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NonSideEffect.java @@ -14,27 +14,28 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ +package org.apache.kylin.stream.coordinator.coordinate.annotations; -package org.apache.kylin.stream.coordinator.exception; - -import org.apache.kylin.stream.core.exception.StreamingException; - -public class NotLeadCoordinatorException extends StreamingException { - public NotLeadCoordinatorException() { - super(); - } - - public NotLeadCoordinatorException(String s) { - super(s); - } - - public NotLeadCoordinatorException(String message, Throwable cause) { - super(message, cause); - } - - public NotLeadCoordinatorException(Throwable cause) { - super(cause); - } +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +/** + * <pre> + * This annotation is a marker for developer. + * It indicate this method which be annotated may failed in some steps. + * But there no need to retry because has no bad influence to other parts. + * </pre> + * + * @see NotAtomicAndNotIdempotent + */ +@Documented +@Retention(RetentionPolicy.SOURCE) +@Target(ElementType.METHOD) +@Inherited +public @interface NonSideEffect { } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java new file mode 100644 index 0000000..e14d7d2 --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicAndNotIdempotent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <pre> + * This annotation is a marker for developer. + * Indicate this method is a transaction may break in intermediate state and has no guarantee of roll back succeed. + * That is to say, it maybe break in some situation and need have a manual repair. + * It is a warning and not user friendly. + * + * For the worst situation, ClusterDoctor should be fix it as a manual repair way. + * </pre> + * + * @see NotAtomicIdempotent + * @see org.apache.kylin.stream.coordinator.doctor.ClusterDoctor + */ +@Documented +@Retention(RetentionPolicy.SOURCE) +@Target(ElementType.METHOD) +@Inherited +public @interface NotAtomicAndNotIdempotent { +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java new file mode 100644 index 0000000..224ef70 --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/annotations/NotAtomicIdempotent.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <pre> + * This annotation is a marker for developer. + * It indicate this method which be annotated is not atomic, it may break in intermediate step. + * Because exception may thrown in R/W action with remote resources, so it stay in non-consistent state. + * But caller can retry it safely to achieve the final consistency。 + * </pre> + * + * @see NotAtomicAndNotIdempotent + */ +@Documented +@Retention(RetentionPolicy.SOURCE) +@Target(ElementType.METHOD) +@Inherited +public @interface NotAtomicIdempotent { +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java similarity index 67% copy from stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java copy to stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java index a439511..f06e7ef 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterDoctor.java @@ -14,23 +14,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ -package org.apache.kylin.stream.coordinator.exception; +package org.apache.kylin.stream.coordinator.doctor; -public class StoreException extends RuntimeException { - - private static final long serialVersionUID = -9149609663117728575L; - - public StoreException() { - super(); - } - - public StoreException(Throwable t) { - super(t); - } +/** + * Repair inconsistent state according to result of ClusterStateChecker + * + * @see org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent + * @see ClusterStateChecker + */ +public class ClusterDoctor implements Runnable { - public StoreException(String message, Throwable t) { - super(message, t); + @Override + public void run() { + // TO BE IMPLEMENTED } } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java similarity index 61% copy from stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java copy to stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java index a439511..2c4fce2 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java @@ -14,23 +14,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ -package org.apache.kylin.stream.coordinator.exception; +package org.apache.kylin.stream.coordinator.doctor; -public class StoreException extends RuntimeException { - - private static final long serialVersionUID = -9149609663117728575L; - - public StoreException() { - super(); - } - - public StoreException(Throwable t) { - super(t); - } - - public StoreException(String message, Throwable t) { - super(message, t); - } +/** + * <pre> + * Basic step of this class: + * 1. stop coordinator to avoid underlying concurrency issue + * 2. check inconsistent state of all receiver cluster + * 3. send summary via mail to kylin admin + * 4. if need, call ClusterDoctor to repair inconsistent issue + * </pre> + * @see org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent + * @see ClusterDoctor + */ +public class ClusterStateChecker { + // TO BE IMPLEMENTED } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java index 40231e9..105344a 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/ClusterStateException.java @@ -18,10 +18,13 @@ package org.apache.kylin.stream.coordinator.exception; +import org.apache.kylin.stream.core.exception.StreamingException; + import java.util.Locale; -public class ClusterStateException extends CoordinateException { +public class ClusterStateException extends StreamingException { + @SuppressWarnings("unused") private final String cubeName; private final ClusterState clusterState; private final TransactionStep transactionStep; @@ -35,6 +38,7 @@ public class ClusterStateException extends CoordinateException { return transactionStep; } + @SuppressWarnings("unused") public String getInconsistentPart() { return inconsistentPart; } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java index 415933e..9b28355 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/CoordinateException.java @@ -21,6 +21,8 @@ package org.apache.kylin.stream.coordinator.exception; import org.apache.kylin.stream.core.exception.StreamingException; public class CoordinateException extends StreamingException { + + @SuppressWarnings("unused") public CoordinateException() { super(); } @@ -29,6 +31,7 @@ public class CoordinateException extends StreamingException { super(s); } + @SuppressWarnings("unused") public CoordinateException(String message, Throwable cause) { super(message, cause); } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java index dfb42e8..8ba032a 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/NotLeadCoordinatorException.java @@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator.exception; import org.apache.kylin.stream.core.exception.StreamingException; public class NotLeadCoordinatorException extends StreamingException { + @SuppressWarnings("unused") public NotLeadCoordinatorException() { super(); } @@ -33,6 +34,7 @@ public class NotLeadCoordinatorException extends StreamingException { super(message, cause); } + @SuppressWarnings("unused") public NotLeadCoordinatorException(Throwable cause) { super(cause); } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java index a439511..a38aa91 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/exception/StoreException.java @@ -22,6 +22,7 @@ public class StoreException extends RuntimeException { private static final long serialVersionUID = -9149609663117728575L; + @SuppressWarnings("unused") public StoreException() { super(); } @@ -30,6 +31,7 @@ public class StoreException extends RuntimeException { super(t); } + @SuppressWarnings("unused") public StoreException(String message, Throwable t) { super(message, t); } diff --git a/build/conf/kylin-server-log4j.properties b/stream-coordinator/src/main/resources/log4j.properties similarity index 67% copy from build/conf/kylin-server-log4j.properties copy to stream-coordinator/src/main/resources/log4j.properties index aba7001..abea109 100644 --- a/build/conf/kylin-server-log4j.properties +++ b/stream-coordinator/src/main/resources/log4j.properties @@ -15,18 +15,10 @@ # limitations under the License. # - -#define appenders -log4j.appender.file=org.apache.log4j.RollingFileAppender -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.File=${catalina.home}/../logs/kylin.log +log4j.rootLogger=WARN,stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n -log4j.appender.file.Append=true -log4j.appender.file.MaxFileSize=268435456 -log4j.appender.file.MaxBackupIndex=10 - -#overall config -log4j.rootLogger=INFO,file +log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.kylin=DEBUG -log4j.logger.org.springframework=WARN -log4j.logger.org.springframework.security=INFO +log4j.logger.org.springframework=WARN \ No newline at end of file diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java new file mode 100644 index 0000000..95a0f05 --- /dev/null +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BuildJobSubmitterTest extends StreamingTestBase { + + // shared stub object + CubeManager cubeManager; + StreamingCoordinator streamingCoordinator; + ReceiverClusterManager clusterManager; + ExecutableManager executableManager; + KylinConfig config = stubKylinConfig(); + + void beforeTestTraceEarliestSegmentBuildJob() { + // prepare dependency + CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L); + CubeInstance cubeInstance = stubCubeInstance(cubeSegment); + + cubeManager = stubCubeManager(cubeInstance, false); + config = stubKylinConfig(); + + Map<String, CubingJob> cubingJobMap = new HashMap<>(); + cubingJobMap.put(mockBuildJob1, stubCubingJob(ExecutableState.SUCCEED)); + cubingJobMap.put(mockBuildJob2, stubCubingJob(ExecutableState.DISCARDED)); + cubingJobMap.put(mockBuildJob3, stubCubingJob(ExecutableState.ERROR)); + executableManager = stubExecutableManager(cubingJobMap); + streamingCoordinator = stubStreamingCoordinator(config, cubeManager, executableManager); + clusterManager = stubReceiverClusterManager(streamingCoordinator); + when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager); + } + + @Test + public void testTraceEarliestSegmentBuildJob() { + beforeTestTraceEarliestSegmentBuildJob(); + BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); + buildJobSubmitter.restore(); + List<SegmentJobBuildInfo> jobList = buildJobSubmitter.traceEarliestSegmentBuildJob(); + assertEquals(1, jobList.size()); + assertThat(jobList.stream().map(x -> x.jobID).collect(Collectors.toSet()), hasItem(mockBuildJob1)); + assertEquals(1, buildJobSubmitter.getCubeCheckList().size()); + } + + @Test + @SuppressWarnings("unchecked") + public void testTraceEarliestSegmentBuildJob2() throws IOException { + beforeTestTraceEarliestSegmentBuildJob(); + when(clusterManager.segmentBuildComplete(isA(CubingJob.class), isA(CubeInstance.class), isA(CubeSegment.class), + isA(SegmentJobBuildInfo.class))).thenThrow(IOException.class); + BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); + buildJobSubmitter.restore(); + List<SegmentJobBuildInfo> jobList = buildJobSubmitter.traceEarliestSegmentBuildJob(); + assertEquals(0, jobList.size()); + assertEquals(0, buildJobSubmitter.getCubeCheckList().size()); + } + + void prepareTestCheckSegmentBuidJobFromMetadata() { + CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L); + CubeInstance cubeInstance = stubCubeInstance(cubeSegment); + config = stubKylinConfig(); + when(cubeInstance.getConfig()).thenReturn(config); + + cubeManager = stubCubeManager(cubeInstance, false); + + Map<String, CubingJob> cubingJobMap = new HashMap<>(); + cubingJobMap.put(mockBuildJob1, stubCubingJob(ExecutableState.SUCCEED)); + cubingJobMap.put(mockBuildJob2, stubCubingJob(ExecutableState.DISCARDED)); + cubingJobMap.put(mockBuildJob3, stubCubingJob(ExecutableState.DISCARDED)); + cubingJobMap.put(mockBuildJob4, stubCubingJob(ExecutableState.ERROR)); + + executableManager = stubExecutableManager(cubingJobMap); + streamingCoordinator = stubStreamingCoordinator(config, cubeManager, executableManager); + clusterManager = stubReceiverClusterManager(streamingCoordinator); + when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager); + } + + @Test + public void testCheckSegmentBuidJobFromMetadata() { + prepareTestCheckSegmentBuidJobFromMetadata(); + BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); + buildJobSubmitter.restore(); + List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName2); + assertEquals(1, segmentReadyList.size()); + + segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName3); + assertEquals(1, segmentReadyList.size()); + } + + @Test + public void testCheckSegmentBuidJobFromMetadata1() { + prepareTestCheckSegmentBuidJobFromMetadata(); + BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); + buildJobSubmitter.restore(); + + List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName4); + verify(executableManager, times(1)).resumeJob(eq(mockBuildJob4)); + assertEquals(0, segmentReadyList.size()); + } + + @Test + public void testSubmitSegmentBuildJob() throws IOException { + CubeSegment cubeSegment1 = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L); + CubeSegment cubeSegment2 = stubCubSegment(SegmentStatusEnum.NEW, 1559390400000L, 1559394000000L); + + CubeInstance cubeInstance = stubCubeInstance(cubeSegment1); + @SuppressWarnings("unchecked") + Iterator<CubeSegment> cubeSegmentIterable = mock(Iterator.class); + when(cubeSegmentIterable.hasNext()).thenReturn(true, false); + @SuppressWarnings("unchecked") + Segments<CubeSegment> segmentSegments = mock(Segments.class, RETURNS_DEEP_STUBS); + when(cubeSegmentIterable.next()).thenReturn(cubeSegment1, cubeSegment2); + when(segmentSegments.iterator()).thenReturn(cubeSegmentIterable); + when(cubeInstance.getSegments()).thenReturn(segmentSegments); + + config = stubKylinConfig(); + when(cubeInstance.getConfig()).thenReturn(config); + + cubeManager = stubCubeManager(cubeInstance, false); + + Map<String, CubingJob> cubingJobMap = new HashMap<>(); + cubingJobMap.put(mockBuildJob1, stubCubingJob(ExecutableState.SUCCEED)); + cubingJobMap.put(mockBuildJob2, stubCubingJob(ExecutableState.DISCARDED)); + cubingJobMap.put(mockBuildJob3, stubCubingJob(ExecutableState.DISCARDED)); + cubingJobMap.put(mockBuildJob4, stubCubingJob(ExecutableState.ERROR)); + + executableManager = stubExecutableManager(cubingJobMap); + streamingCoordinator = stubStreamingCoordinator(config, cubeManager, executableManager); + clusterManager = stubReceiverClusterManager(streamingCoordinator); + when(streamingCoordinator.getClusterManager()).thenReturn(clusterManager); + + when(cubeManager.appendSegment(any(CubeInstance.class), any(SegmentRange.TSRange.class))) + .thenReturn(cubeSegment1, cubeSegment2); + + BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); + buildJobSubmitter = spy(buildJobSubmitter); + + DefaultChainedExecutable cubingJob = mock(DefaultChainedExecutable.class); + when(cubingJob.getId()).thenReturn(mockBuildJob4); + doReturn(cubingJob).when(buildJobSubmitter).getStreamingCubingJob(any(CubeSegment.class)); + + buildJobSubmitter.restore(); + boolean submitSuccess = buildJobSubmitter.submitSegmentBuildJob(cubeName1, segment1); + assertTrue(submitSuccess); + } +} \ No newline at end of file diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java new file mode 100644 index 0000000..5308d79 --- /dev/null +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.coordinator.coordinate; + +import com.google.common.collect.Lists; +import org.apache.curator.test.TestingServer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.ZKUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.stream.coordinator.StreamMetadataStore; +import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; +import org.apache.kylin.stream.coordinator.StreamingUtils; +import org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore; +import org.apache.kylin.stream.core.model.CubeAssignment; +import org.apache.kylin.stream.core.model.Node; +import org.apache.kylin.stream.core.model.ReplicaSet; +import org.apache.kylin.stream.core.model.SegmentBuildState; +import org.apache.kylin.stream.core.source.Partition; +import org.apache.kylin.stream.source.kafka.KafkaPosition; +import org.apache.kylin.stream.source.kafka.KafkaPositionHandler; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.mockito.Matchers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Make create stub easier. + */ +public class StreamingTestBase extends LocalFileMetadataTestCase { + private static final Logger logger = LoggerFactory.getLogger(StreamingTestBase.class); + private static final int port = 12181; + private static final int retryTimes = 10; + private String connectStr; + + /** + * mock zookeeper server for streaming metadata + */ + TestingServer testingServer; + + /** + * Real StreamMetadataStore based on fake zookeeper server + */ + StreamMetadataStore metadataStore; + + @Before + public void init() { + logger.debug("Start zk and prepare meatdata."); + staticCreateTestMetadata(); + int realPort = port; + for (int i = 0; i <= retryTimes; i++) { + try { + testingServer = new TestingServer(realPort, false); + testingServer.start(); + } catch (Exception e) { // maybe caused by port occupy + logger.error("Failed start zookeeper server at " + realPort, e); + realPort++; + continue; + } + break; + } + Assume.assumeTrue(realPort - port < retryTimes); + connectStr = "localhost:" + realPort; + System.setProperty("kylin.env.zookeeper-connect-string", connectStr); + metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore(); + initZookeeperMetadataStore(); + } + + @After + public void tearDown() throws Exception { + logger.debug("Tear down server."); + ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT); + ((ZookeeperStreamMetadataStore) metadataStore).close(); + testingServer.stop(); + System.clearProperty("kylin.env.zookeeper-connect-string"); + } + + // ================================================================================ + // ================================ Init Metadata ================================= + + // replica set info + ReplicaSet rs1 = new ReplicaSet(); + ReplicaSet rs2 = new ReplicaSet(); + ReplicaSet rs3 = new ReplicaSet(); + ReplicaSet rs4 = new ReplicaSet(); + + // receiver info + Node n1 = new Node("Node-1", 1000); + Node n2 = new Node("Node-2", 1000); + Node n3 = new Node("Node-3", 1000); + Node n4 = new Node("Node-4", 1000); + Node n5 = new Node("Node-5", 1000); + Node n6 = new Node("Node-6", 1000); + Node n7 = new Node("Node-7", 1000); + Node n8 = new Node("Node-8", 1000); + + // cube info + String cubeName1 = "MockRealtimeCube_1"; + String cubeName2 = "MockRealtimeCube_2"; + String cubeName3 = "MockRealtimeCube_3"; + String cubeName4 = "MockRealtimeCube_4"; + + // cube segment info + String segment1 = "20190601120000_20190601130000"; + String segment2 = "20190601130000_20190601140000"; + + // topic partition info + Partition p1 = new Partition(1); + Partition p2 = new Partition(2); + Partition p3 = new Partition(3); + Partition p4 = new Partition(4); + Partition p5 = new Partition(5); + Partition p6 = new Partition(6); + + String mockBuildJob1 = "mock_job_00001"; + String mockBuildJob2 = "mock_job_00002"; + String mockBuildJob3 = "mock_job_00003"; + String mockBuildJob4 = "mock_job_00004"; + + private void initZookeeperMetadataStore() { + + // add receiver + metadataStore.addReceiver(n1); + metadataStore.addReceiver(n2); + metadataStore.addReceiver(n3); + metadataStore.addReceiver(n4); + metadataStore.addReceiver(n5); + metadataStore.addReceiver(n6); + metadataStore.addReceiver(n7); + metadataStore.addReceiver(n8); + + // add replica set and allocate receivers to replica set + rs1.addNode(n1); + rs1.addNode(n2); + rs2.addNode(n3); + rs2.addNode(n4); + rs3.addNode(n5); + rs3.addNode(n6); + rs4.addNode(n7); + rs4.addNode(n8); + + int rsId; + rsId = metadataStore.createReplicaSet(rs1); + rs1.setReplicaSetID(rsId); + + rsId = metadataStore.createReplicaSet(rs2); + rs2.setReplicaSetID(rsId); + + rsId = metadataStore.createReplicaSet(rs3); + rs3.setReplicaSetID(rsId); + + rsId = metadataStore.createReplicaSet(rs4); + rs4.setReplicaSetID(rsId); + + createCubeMetadata(cubeName1, mockBuildJob1, true); + createCubeMetadata(cubeName2, mockBuildJob2, false); + createCubeMetadata(cubeName3, mockBuildJob3, true); + createCubeMetadata(cubeName4, mockBuildJob4, true); + } + + public void createCubeMetadata(String cubeName, String jobID, boolean addSegmentBuildState) { + + // add assignment for cube + Map<Integer, List<Partition>> preAssignMap = new HashMap<>(); + preAssignMap.put(rs1.getReplicaSetID(), Lists.newArrayList(p1, p2)); + preAssignMap.put(rs2.getReplicaSetID(), Lists.newArrayList(p3, p4)); + preAssignMap.put(rs3.getReplicaSetID(), Lists.newArrayList(p5, p6)); + CubeAssignment originalAssignment = new CubeAssignment(cubeName, preAssignMap); + metadataStore.saveNewCubeAssignment(originalAssignment); + + // add remote checkpoint for segment1 + // check StreamingServer#sendSegmentsToFullBuild + KafkaPositionHandler kafkaPositionHandler = new KafkaPositionHandler(); + Map<Integer, Long> positionMap = new HashMap<>(); + positionMap.put(p1.getPartitionId(), 10001L); + positionMap.put(p2.getPartitionId(), 10002L); + + String positionStr = kafkaPositionHandler.serializePosition(new KafkaPosition(positionMap)); + metadataStore.saveSourceCheckpoint(cubeName, segment1, rs1.getReplicaSetID(), positionStr); + + positionMap.clear(); + positionMap.put(p3.getPartitionId(), 10003L); + positionMap.put(p4.getPartitionId(), 10004L); + positionStr = kafkaPositionHandler.serializePosition(new KafkaPosition(positionMap)); + metadataStore.saveSourceCheckpoint(cubeName, segment1, rs2.getReplicaSetID(), positionStr); + + positionMap.clear(); + positionMap.put(p5.getPartitionId(), 10005L); + positionMap.put(p6.getPartitionId(), 10006L); + positionStr = kafkaPositionHandler.serializePosition(new KafkaPosition(positionMap)); + metadataStore.saveSourceCheckpoint(cubeName, segment1, rs3.getReplicaSetID(), positionStr); + + positionMap.clear(); + positionMap.put(p1.getPartitionId(), 20001L); + positionMap.put(p2.getPartitionId(), 20002L); + positionStr = kafkaPositionHandler.serializePosition(new KafkaPosition(positionMap)); + metadataStore.saveSourceCheckpoint(cubeName, segment2, rs1.getReplicaSetID(), positionStr); + + // notify by replica set data has been uploaded + metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, rs1.getReplicaSetID()); + metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, rs2.getReplicaSetID()); + metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment1, rs3.getReplicaSetID()); + metadataStore.addCompleteReplicaSetForSegmentBuild(cubeName, segment2, rs1.getReplicaSetID()); + + if (addSegmentBuildState) { + // update segment build job info + SegmentBuildState.BuildState buildState = new SegmentBuildState.BuildState(); + buildState.setBuildStartTime(System.currentTimeMillis() - 30 * 60 * 1000);// submit 30 minutes ago + buildState.setJobId(jobID); + buildState.setState(SegmentBuildState.BuildState.State.BUILDING); + metadataStore.updateSegmentBuildState(cubeName, segment1, buildState); + } + } + // ================================================================================ + // ============================= Prepare stub object ============================== + + CubingJob stubCubingJob(ExecutableState state) { + CubingJob job = mock(CubingJob.class); + when(job.getStatus()).thenReturn(state); + return job; + } + + ReceiverClusterManager stubReceiverClusterManager(StreamingCoordinator coordinator) { + ReceiverClusterManager clusterManager = mock(ReceiverClusterManager.class); + when(clusterManager.getCoordinator()).thenReturn(coordinator); + return clusterManager; + // return new ReceiverClusterManager(coordinator); + } + + KylinConfig stubKylinConfig() { + KylinConfig kylinConfig = mock(KylinConfig.class); + when(kylinConfig.getMaxBuildingSegments()).thenReturn(10); + when(kylinConfig.getStreamingAssigner()).thenReturn("DefaultAssigner"); + + // ZK part + when(kylinConfig.getZKBaseSleepTimeMs()).thenReturn(5000); + when(kylinConfig.getZKMaxRetries()).thenReturn(3); + when(kylinConfig.getZookeeperConnectString()).thenReturn(connectStr); + return kylinConfig; + } + + StreamingCoordinator stubStreamingCoordinator(KylinConfig config, CubeManager cubeManager, + ExecutableManager executableManager) { + StreamingCoordinator coordinator = mock(StreamingCoordinator.class); + when(coordinator.getConfig()).thenReturn(config); + when(coordinator.getCubeManager()).thenReturn(cubeManager); + when(coordinator.getExecutableManager()).thenReturn(executableManager); + when(coordinator.getStreamMetadataStore()).thenReturn(metadataStore); + return coordinator; + } + + ExecutableManager stubExecutableManager(Map<String, CubingJob> cubingJobMap) { + ExecutableManager executableManager = mock(ExecutableManager.class); + for (Map.Entry<String, CubingJob> entry : cubingJobMap.entrySet()) { + when(executableManager.getJob(eq(entry.getKey()))).thenReturn(entry.getValue()); + } + return executableManager; + } + + CubeManager stubCubeManager(CubeInstance cubeInstance, boolean promotedNewSegmentFailed) { + CubeManager cubeManager = mock(CubeManager.class); + try { + when(cubeManager.getCube(anyString())).thenReturn(cubeInstance); + if (promotedNewSegmentFailed) { + doThrow(RuntimeException.class).when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class), + any(CubeSegment.class)); + } else { + doNothing().when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class), + any(CubeSegment.class)); + } + doNothing().when(cubeManager).promoteNewlyBuiltSegments(any(CubeInstance.class), any(CubeSegment.class)); + } catch (IOException ioe) { + // a ugly workaroud for mock method with declaration of throwing checked exception + } + return cubeManager; + } + + CubeInstance stubCubeInstance(CubeSegment cubSegment) { + CubeInstance cubeInstance = mock(CubeInstance.class); + when(cubeInstance.latestCopyForWrite()).thenReturn(cubeInstance); + @SuppressWarnings("unchecked") + Segments<CubeSegment> segmentSegments = mock(Segments.class, RETURNS_DEEP_STUBS); + + when(segmentSegments.size()).thenReturn(1); + when(cubeInstance.getBuildingSegments()).thenReturn(segmentSegments); + when(cubeInstance.getName()).thenReturn(cubeName1); + when(cubeInstance.getSegment(anyString(), Matchers.any())).thenReturn(cubSegment); + return cubeInstance; + } + + CubeSegment stubCubSegment(SegmentStatusEnum statusEnum, long leftRange, long rightRange) { + CubeSegment cubeSegment = mock(CubeSegment.class); + when(cubeSegment.getTSRange()).thenReturn(new SegmentRange.TSRange(leftRange, rightRange)); + when(cubeSegment.getStatus()).thenReturn(statusEnum); + return cubeSegment; + } +} diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 7a16044..b4b4123 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -59,6 +59,7 @@ import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; import org.apache.kylin.stream.coordinator.StreamingUtils; import org.apache.kylin.stream.coordinator.client.CoordinatorClient; import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient; +import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent; import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; import org.apache.kylin.stream.core.consumer.EndPositionStopCondition; import org.apache.kylin.stream.core.consumer.IConsumerProvider; @@ -202,6 +203,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis } } + @NotAtomicIdempotent private void sendSegmentsToFullBuild(String cubeName, StreamingSegmentManager segmentManager, Collection<StreamingCubeSegment> segments) throws Exception { List<Future<?>> futureList = Lists.newArrayList(); @@ -218,7 +220,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis int i = 0; for (StreamingCubeSegment segment : segments) { futureList.get(i).get(); - logger.info("save remote store state to metadata store."); + logger.info("Save remote store state to metadata store."); streamMetadataStore.addCompleteReplicaSetForSegmentBuild(segment.getCubeName(), segment.getSegmentName(), replicaSetID); @@ -228,12 +230,12 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis streamMetadataStore.saveSourceCheckpoint(segment.getCubeName(), segment.getSegmentName(), replicaSetID, smallestSourcePosStr); - logger.info("send notification to coordinator for cube {} segment {}.", cubeName, segment.getSegmentName()); + logger.info("Send notification to coordinator for cube {} segment {}.", cubeName, segment.getSegmentName()); coordinatorClient.segmentRemoteStoreComplete(currentNode, segment.getCubeName(), new Pair<>(segment.getDateRangeStart(), segment.getDateRangeEnd())); - logger.info("send notification success."); + logger.info("Send notification success."); segment.saveState(StreamingCubeSegment.State.REMOTE_PERSISTED); - logger.info("cube {} segment {} status converted to {}", segment.getCubeName(), segment.getSegmentName(), + logger.info("Commit cube {} segment {} status converted to {}.", segment.getCubeName(), segment.getSegmentName(), StreamingCubeSegment.State.REMOTE_PERSISTED.name()); i++; }