KYLIN-1188 use helix 0.7.1 to manage the job engine assignment
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ceec8980 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ceec8980 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ceec8980 Branch: refs/heads/helix-201602 Commit: ceec8980b5a02a5f2e2f5f8f8f34ded4b708a638 Parents: b26d957 Author: shaofengshi <[email protected]> Authored: Tue Jan 12 15:07:25 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Sat Feb 6 13:31:49 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 16 +- .../apache/kylin/common/KylinConfigBase.java | 28 +++ .../test_case_data/sandbox/kylin.properties | 2 + pom.xml | 1 + .../kylin/rest/controller/JobController.java | 33 +-- .../kylin/rest/helix/HelixClusterAdmin.java | 245 +++++++++++++++++++ .../helix/LeaderStandbyStateModelFactory.java | 70 ++++++ .../apache/kylin/rest/service/CubeService.java | 6 +- .../kylin/rest/helix/HelixClusterAdminTest.java | 140 +++++++++++ 9 files changed, 516 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 44a282e..8456ecb 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -1,12 +1,24 @@ -## Config for Kylin Engine ## +## Cluster related properties ## +# Required, comma separated list of zk servers; +kylin.zookeeper.address= +# rest address of this instance, ; +# optional, default be <hostname>:7070 +kylin.rest.address= + +# whether run a cluster controller in this node +kylin.cluster.controller=true # optional information for the owner of kylin platform, it can be your team's email # currently it will be attached to each kylin's htable attribute [email protected] # List of web servers in use, this enables one web server instance to sync up with other servers. -kylin.rest.servers=localhost:7070 +# Deprecated, cluster will self-discover and update this. +# kylin.rest.servers=localhost:7070 + +# Server mode: all, job, query +kylin.server.mode=all # The metadata store in hbase kylin.metadata.url=kylin_metadata@hbase http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5ce4ddc..a36b977 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -530,6 +530,34 @@ public class KylinConfigBase implements Serializable { return getOptional("mail.sender", ""); } + public String getZookeeperAddress() { + return this.getOptional("kylin.zookeeper.address"); + } + + public void setZookeeperAddress(String zkAddress) { + setProperty("kylin.zookeeper.address", zkAddress); + } + + public String getClusterName() { + return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix()); + } + + public void setClusterName(String clusterName) { + setProperty("kylin.cluster.name", clusterName); + } + + public boolean isClusterController() { + return Boolean.parseBoolean(getOptional("kylin.cluster.controller", "true")); + } + + public String getRestAddress() { + return this.getOptional("kylin.rest.address"); + } + + public void setRestAddress(String restAddress) { + setProperty("kylin.rest.address", restAddress); + } + public String toString() { return getMetadataUrl(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 18ff1cc..5ce636b 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -11,6 +11,8 @@ kylin.rest.servers=localhost:7070 #set display timezone on UI,format like[GMT+N or GMT-N] kylin.rest.timezone=GMT-8 +kylin.server.mode=all +>>>>>>> KYLIN-1188 use helix 0.7.1 to manage the job engine assignment # The metadata store in hbase kylin.metadata.url=kylin_metadata@hbase http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 21fd8aa..8f04dcd 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,7 @@ org/apache/kylin/**/tools/**:**/*CLI.java </sonar.jacoco.excludes> + <helix.version>0.7.1</helix.version> </properties> <licenses> http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java index 9dfb594..741b5ee 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -26,15 +26,13 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import com.google.common.collect.Lists; -import joptsimple.internal.Strings; +import com.google.common.base.Preconditions; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.rest.exception.InternalErrorException; -import org.apache.kylin.rest.helix.HelixJobEngineAdmin; +import org.apache.kylin.rest.helix.HelixClusterAdmin; import org.apache.kylin.rest.request.JobListRequest; import org.apache.kylin.rest.service.JobService; import org.slf4j.Logger; @@ -51,8 +49,6 @@ import java.io.IOException; import java.util.*; /** - * @author ysong1 - * @author Jack * */ @Controller @@ -76,9 +72,19 @@ public class JobController extends BasicController implements InitializingBean { TimeZone tzone = TimeZone.getTimeZone(timeZone); TimeZone.setDefault(tzone); - final String instanceName = HelixJobEngineAdmin.getCurrentInstanceName(); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Preconditions.checkNotNull(kylinConfig.getZookeeperAddress(), "'kylin.zookeeper.address' couldn't be null, set it in kylin.properties."); + final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig); + clusterAdmin.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + clusterAdmin.stop(); + } + })); + } /** @@ -190,17 +196,4 @@ public class JobController extends BasicController implements InitializingBean { this.jobService = jobService; } - private void updateKylinCluster(List<String> instances) { - List<String> instanceRestAddresses = Lists.newArrayList(); - for (String instanceName : instances) { - int indexOfUnderscore = instanceName.lastIndexOf("_"); - instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1)); - } - String restServersInCluster = Strings.join(instanceRestAddresses, ","); - KylinConfig.getInstanceFromEnv().setProperty("kylin.rest.servers", restServersInCluster); - System.setProperty("kylin.rest.servers", restServersInCluster); - Broadcaster.clearCache(); - - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java new file mode 100644 index 0000000..9983aae --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.rest.helix; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import joptsimple.internal.Strings; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.*; +import org.apache.helix.api.id.StateModelDefId; +import org.apache.helix.controller.HelixControllerMain; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.model.*; +import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.restclient.Broadcaster; +import org.apache.kylin.rest.constant.Constant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +/** + * Administrator of Kylin cluster + */ +public class HelixClusterAdmin { + + public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine"; + + public static final String MODEL_LEADER_STANDBY = "LeaderStandby"; + public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline"; + public static final String TAG_JOB_ENGINE = "Tag_JobEngine"; + + private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap(); + private HelixManager participantManager; + private HelixManager controllerManager; + + private final KylinConfig kylinConfig; + + private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class); + private final String zkAddress; + private final ZKHelixAdmin admin; + private final String clusterName; + + private HelixClusterAdmin(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + this.zkAddress = kylinConfig.getZookeeperAddress(); + this.clusterName = kylinConfig.getClusterName(); + this.admin = new ZKHelixAdmin(zkAddress); + } + + public void start() throws Exception { + initCluster(); + final String instanceName = getCurrentInstanceName(); + + // use the tag to mark node's role. + final List<String> instanceTags = Lists.newArrayList(); + final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode()); + if (runJobEngine) { + instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE); + } + + addInstance(instanceName, instanceTags); + startInstance(instanceName); + + rebalanceWithTag(instanceTags); + + boolean startController = kylinConfig.isClusterController(); + if (startController) { + startController(); + } + } + + /** + * Initiate the cluster, adding state model definitions and resource definitions + */ + protected void initCluster() { + admin.addCluster(clusterName, false); + if (admin.getStateModelDef(clusterName, MODEL_ONLINE_OFFLINE) == null) { + admin.addStateModelDef(clusterName, MODEL_ONLINE_OFFLINE, new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline())); + } + if (admin.getStateModelDef(clusterName, MODEL_LEADER_STANDBY) == null) { + admin.addStateModelDef(clusterName, MODEL_LEADER_STANDBY, new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby())); + } + + // add job engine as a resource, 1 partition + if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) { + admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); + } + + } + + /** + * Start the instance and register the state model factory + * @param instanceName + * @throws Exception + */ + protected void startInstance(String instanceName) throws Exception { + participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress); + participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory()); + participantManager.connect(); + participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener()); + + } + + /** + * Rebalance the resource with the tags + * @param tags + */ + protected void rebalanceWithTag(List<String> tags) { + for (String tag : tags) { + if (tag.equals(TAG_JOB_ENGINE)) { + List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE); + admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag); + } + } + } + + /** + * Start an embedded helix controller + */ + protected void startController() { + controllerManager = HelixControllerMain.startHelixController(zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE); + } + + public void stop() { + if (participantManager != null) { + participantManager.disconnect(); + } + + if (controllerManager != null) { + controllerManager.disconnect(); + } + } + + public String getInstanceState(String resourceName) { + String instanceName = this.getCurrentInstanceName(); + final ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName); + if (resourceExternalView == null) { + logger.warn("fail to get ExternalView, clusterName:" + clusterName + " resourceName:" + resourceName); + return "ERROR"; + } + final Set<String> partitionSet = resourceExternalView.getPartitionSet(); + final Map<String, String> stateMap = resourceExternalView.getStateMap(partitionSet.iterator().next()); + if (stateMap.containsKey(instanceName)) { + return stateMap.get(instanceName); + } else { + logger.warn("fail to get state, clusterName:" + clusterName + " resourceName:" + resourceName + " instance:" + instanceName); + return "ERROR"; + } + } + + /** + * Check whether current kylin instance is in the leader role + * @return + */ + public boolean isLeaderRole(String resourceName) { + final String instanceState = getInstanceState(resourceName); + logger.debug("instance state: " + instanceState); + if ("LEADER".equalsIgnoreCase(instanceState)) { + return true; + } + + return false; + } + + /** + * Add instance to cluster, with a tag list + * @param instanceName should be unique in format: hostName_port + * @param tags + */ + public void addInstance(String instanceName, List<String> tags) { + final String hostname = instanceName.substring(0, instanceName.lastIndexOf("_")); + final String port = instanceName.substring(instanceName.lastIndexOf("_") + 1); + InstanceConfig instanceConfig = new InstanceConfig(instanceName); + instanceConfig.setHostName(hostname); + instanceConfig.setPort(port); + if (tags != null) { + for (String tag : tags) { + instanceConfig.addTag(tag); + } + } + + if (admin.getInstancesInCluster(clusterName).contains(instanceName)) { + admin.dropInstance(clusterName, instanceConfig); + } + admin.addInstance(clusterName, instanceConfig); + } + + public static HelixClusterAdmin getInstance(KylinConfig kylinConfig) { + Preconditions.checkNotNull(kylinConfig); + instanceMaps.putIfAbsent(kylinConfig, new HelixClusterAdmin(kylinConfig)); + return instanceMaps.get(kylinConfig); + } + + public String getCurrentInstanceName() { + final String restAddress = kylinConfig.getRestAddress(); + if (StringUtils.isEmpty(restAddress)) { + throw new RuntimeException("There is no kylin.rest.address set in System property and kylin.properties;"); + } + + final String hostname = Preconditions.checkNotNull(restAddress.substring(0, restAddress.lastIndexOf(":")), "failed to get HostName of this server"); + final String port = Preconditions.checkNotNull(restAddress.substring(restAddress.lastIndexOf(":") + 1), "failed to get port of this server"); + return hostname + "_" + port; + } + + /** + * Listen to the cluster's event, update "kylin.rest.servers" to the live instances. + */ + class KylinClusterLiveInstanceChangeListener implements LiveInstanceChangeListener { + @Override + public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { + List<String> instanceRestAddresses = Lists.newArrayList(); + for (LiveInstance liveInstance : liveInstances) { + String instanceName = liveInstance.getInstanceName(); + int indexOfUnderscore = instanceName.lastIndexOf("_"); + instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1)); + } + String restServersInCluster = Strings.join(instanceRestAddresses, ","); + kylinConfig.setProperty("kylin.rest.servers", restServersInCluster); + System.setProperty("kylin.rest.servers", restServersInCluster); + logger.info("kylin.rest.servers update to " + restServersInCluster); + Broadcaster.clearCache(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java new file mode 100644 index 0000000..6694c81 --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java @@ -0,0 +1,70 @@ +package org.apache.kylin.rest.helix; + +import org.apache.helix.NotificationContext; +import org.apache.helix.api.StateTransitionHandlerFactory; +import org.apache.helix.api.TransitionHandler; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.lock.MockJobLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> { + private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class); + + @Override + public TransitionHandler createStateTransitionHandler(PartitionId partitionId) { + if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) { + return new JobEngineStateModel(); + } + + return null; + } + + public static class JobEngineStateModel extends TransitionHandler { + + @Transition(to = "LEADER", from = "STANDBY") + public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()"); + try { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + DefaultScheduler scheduler = DefaultScheduler.createInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); + while (!scheduler.hasStarted()) { + logger.error("scheduler has not been started"); + Thread.sleep(1000); + } + } catch (Exception e) { + logger.error("error start DefaultScheduler", e); + throw new RuntimeException(e); + } + } + + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()"); + DefaultScheduler.destroyInstance(); + + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()"); + + } + + + @Transition(to = "OFFLINE", from = "STANDBY") + public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()"); + + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index 1feb66f..51f241c 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -55,7 +55,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; -import org.apache.kylin.rest.helix.HelixJobEngineAdmin; +import org.apache.kylin.rest.helix.HelixClusterAdmin; import org.apache.kylin.rest.request.MetricsRequest; import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.response.MetricsResponse; @@ -570,8 +570,8 @@ public class CubeService extends BasicService { public void updateOnNewSegmentReady(String cubeName) { logger.debug("on updateOnNewSegmentReady: " + cubeName); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - HelixJobEngineAdmin jobEngineAdmin = HelixJobEngineAdmin.getInstance(kylinConfig.getZookeeperAddress()); - boolean isLeaderRole = jobEngineAdmin.isLeaderRole(kylinConfig.getClusterName(), HelixJobEngineAdmin.getCurrentInstanceName()); + HelixClusterAdmin jobEngineAdmin = HelixClusterAdmin.getInstance(kylinConfig); + boolean isLeaderRole = jobEngineAdmin.isLeaderRole(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE); logger.debug("server is leader role ? " + isLeaderRole); if (isLeaderRole == true) { keepCubeRetention(cubeName); http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java new file mode 100644 index 0000000..70525b3 --- /dev/null +++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java @@ -0,0 +1,140 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.kylin.rest.helix; + +import org.I0Itec.zkclient.IDefaultNameSpace; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; + +import static org.apache.kylin.rest.helix.HelixClusterAdmin.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** +*/ +public class HelixClusterAdminTest extends LocalFileMetadataTestCase { + + String zkAddress = "localhost:2199"; + ZkServer server; + + HelixClusterAdmin clusterAdmin1; + HelixClusterAdmin clusterAdmin2; + KylinConfig kylinConfig; + + private static final String CLUSTER_NAME = "test_cluster"; + + @Before + public void setup() throws Exception { + createTestMetadata(); + // start zookeeper on localhost + final File tmpDir = new File("/tmp/helix-quickstart"); + FileUtil.fullyDelete(tmpDir); + tmpDir.mkdirs(); + server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir", new IDefaultNameSpace() { + @Override + public void createDefaultNameSpace(ZkClient zkClient) { + } + }, 2199); + server.start(); + + kylinConfig = this.getTestConfig(); + kylinConfig.setRestAddress("localhost:7070"); + kylinConfig.setZookeeperAddress(zkAddress); + kylinConfig.setClusterName(CLUSTER_NAME); + + final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress); + zkHelixAdmin.dropCluster(kylinConfig.getClusterName()); + + } + + @Test + public void test() throws Exception { + + // 1. start one instance + clusterAdmin1 = getInstance(kylinConfig); + clusterAdmin1.start(); + + Thread.sleep(1000); + assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertEquals(1, kylinConfig.getRestServers().length); + assertEquals("localhost:7070", kylinConfig.getRestServers()[0]); + + // 2. start second instance + InputStream is = IOUtils.toInputStream(kylinConfig.getConfigAsString()); + KylinConfig kylinConfig2 = KylinConfig.getKylinConfigFromInputStream(is); + kylinConfig2.setRestAddress("localhost:7072"); + is.close(); + + + clusterAdmin2 = getInstance(kylinConfig2); + clusterAdmin2.start(); + + Thread.sleep(1000); + assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertEquals(2, kylinConfig.getRestServers().length); + assertEquals("localhost:7070", kylinConfig.getRestServers()[0]); + assertEquals("localhost:7072", kylinConfig.getRestServers()[1]); + + // 3. shutdown the first instance + clusterAdmin1.stop(); + clusterAdmin1 = null; + Thread.sleep(1000); + assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertEquals(1, kylinConfig.getRestServers().length); + assertEquals("localhost:7072", kylinConfig.getRestServers()[0]); + + // 4. recover first instance + clusterAdmin1 = getInstance(kylinConfig); + clusterAdmin1.start(); + + Thread.sleep(1000); + assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); + assertEquals(2, kylinConfig.getRestServers().length); + assertEquals("localhost:7070", kylinConfig.getRestServers()[0]); + assertEquals("localhost:7072", kylinConfig.getRestServers()[1]); + } + + @After + public void tearDown() { + if (clusterAdmin1 != null) { + clusterAdmin1.stop(); + } + + if (clusterAdmin2 != null) { + clusterAdmin2.stop(); + } + + server.shutdown(); + cleanupTestMetadata(); + } + +}
