KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9274aa81 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9274aa81 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9274aa81 Branch: refs/heads/helix-rebase Commit: 9274aa815252455bbec2ad77cf9456b2351c439c Parents: 3323b11 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Jan 13 12:00:48 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Mar 2 17:24:11 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/rest/constant/Constant.java | 1 + .../kylin/rest/helix/HelixClusterAdmin.java | 22 +++++++-- .../helix/LeaderStandbyStateModelFactory.java | 50 +++++++++++++++++++- 3 files changed, 68 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/constant/Constant.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java index f068e5f..58b74f0 100644 --- a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java +++ b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java @@ -41,6 +41,7 @@ public class Constant { public final static String SERVER_MODE_QUERY = "query"; public final static String SERVER_MODE_JOB = "job"; + public final static String SERVER_MODE_STREAM = "stream"; public final static String SERVER_MODE_ALL = "all"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java index 9983aae..6300383 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -45,10 +45,12 @@ import java.util.concurrent.ConcurrentMap; public class HelixClusterAdmin { public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine"; + public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_"; public static final String MODEL_LEADER_STANDBY = "LeaderStandby"; public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline"; public static final String TAG_JOB_ENGINE = "Tag_JobEngine"; + public static final String TAG_STREAM_BUILDER = "Tag_StreamBuilder"; private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap(); private HelixManager participantManager; @@ -74,11 +76,15 @@ public class HelixClusterAdmin { // use the tag to mark node's role. final List<String> instanceTags = Lists.newArrayList(); - final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode()); - if (runJobEngine) { + if (Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode())) { instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE); + instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER); + } else if (Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode())) { + instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE); + } else if (Constant.SERVER_MODE_STREAM.equalsIgnoreCase(kylinConfig.getServerMode())) { + instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER); } - + addInstance(instanceName, instanceTags); startInstance(instanceName); @@ -108,6 +114,16 @@ public class HelixClusterAdmin { } } + + public void addStreamCubeSlice(String cubeName, long start, long end) { + String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end; + if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) { + admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); + } + + admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER); + + } /** * Start the instance and register the state model factory http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java index 6694c81..c2a78e7 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java @@ -1,5 +1,6 @@ package org.apache.kylin.rest.helix; +import com.google.common.base.Preconditions; import org.apache.helix.NotificationContext; import org.apache.helix.api.StateTransitionHandlerFactory; import org.apache.helix.api.TransitionHandler; @@ -8,12 +9,16 @@ import org.apache.helix.api.id.ResourceId; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.Transition; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.engine.streaming.cli.StreamingCLI; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.lock.MockJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX; + /** */ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> { @@ -22,13 +27,19 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor @Override public TransitionHandler createStateTransitionHandler(PartitionId partitionId) { if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) { - return new JobEngineStateModel(); + return JobEngineStateModel.INSTANCE; } - + + if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) { + return StreamCubeStateModel.INSTANCE; + } + return null; } public static class JobEngineStateModel extends TransitionHandler { + + public static JobEngineStateModel INSTANCE = new JobEngineStateModel(); @Transition(to = "LEADER", from = "STANDBY") public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { @@ -67,4 +78,39 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor } } + + public static class StreamCubeStateModel extends TransitionHandler { + + public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel(); + + @Transition(to = "LEADER", from = "STANDBY") + public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { + String resourceName = message.getResourceId().stringify(); + Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX)); + long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1); + String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); + long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1); + String cubeName = temp.substring(0, temp.lastIndexOf("_")); + + final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build(); + runnable.run(); + } + + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + + + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { + + } + + + @Transition(to = "OFFLINE", from = "STANDBY") + public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { + + } + } }