This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c5d1a398e355631f3135d2a1a47661dfeb13144f Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon May 18 09:48:49 2020 +0800 KYLIN-4499 Extract kylin server self discovery service from CuratorScheduler --- core-common/pom.xml | 11 ++ .../java/org/apache/kylin/common/KConstants.java | 23 +++ .../java/org/apache/kylin/common/KylinConfig.java | 14 +- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../common/zookeeper/KylinServerDiscovery.java | 164 +++++++++++++++++++++ .../kylin/common/zookeeper}/ExampleServer.java | 26 +--- .../common/zookeeper/KylinServerDiscoveryTest.java | 46 +++--- .../kylin/job/impl/curator/CuratorScheduler.java | 100 +------------ .../org/apache/kylin/rest/service/JobService.java | 6 + 9 files changed, 247 insertions(+), 147 deletions(-) diff --git a/core-common/pom.xml b/core-common/pom.xml index fe3af13..407ab9c 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -50,6 +50,12 @@ <artifactId>jackson-databind</artifactId> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <!-- this jar was absent from hbase lib, so compile it --> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.freemarker</groupId> <artifactId>freemarker</artifactId> </dependency> @@ -104,6 +110,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.kylin</groupId> <artifactId>kylin-shaded-guava</artifactId> </dependency> diff --git a/core-common/src/main/java/org/apache/kylin/common/KConstants.java b/core-common/src/main/java/org/apache/kylin/common/KConstants.java new file mode 100644 index 0000000..5e1723c --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/KConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common; + +public class KConstants { + public static final int DEFAULT_SERVICE_PORT = 7070; +} diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 6297bd1..7b0888b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -18,7 +18,6 @@ package org.apache.kylin.common; -import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.restclient.RestClient; @@ -48,6 +47,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import org.apache.kylin.shaded.com.google.common.base.Strings; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; + /** */ public class KylinConfig extends KylinConfigBase { @@ -525,7 +527,15 @@ public class KylinConfig extends KylinConfigBase { String value = entry.getValue().toString(); orderedProperties.setProperty(key, value); } - + // Reset some properties which might be overriden by system properties + String[] systemProps = { "kylin.server.cluster-servers", "kylin.server.cluster-servers-with-mode" }; + for (String sysProp : systemProps) { + String sysPropValue = System.getProperty(sysProp); + if (!Strings.isNullOrEmpty(sysPropValue)) { + orderedProperties.setProperty(sysProp, sysPropValue); + } + } + final StringBuilder sb = new StringBuilder(); for (Map.Entry<String, String> entry : orderedProperties.entrySet()) { sb.append(entry.getKey() + "=" + entry.getValue()).append('\n'); diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ff6138a..62021a9 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2140,6 +2140,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.server.host-address", "localhost:7070"); } + public boolean getServerSelfDiscoveryEnabled() { + return Boolean.parseBoolean(getOptional("kylin.server.self-discovery-enabled", FALSE)); + } + public String getClusterName() { String key = "kylin.server.cluster-name"; String clusterName = this.getOptional(key, getMetadataUrlPrefix()); diff --git a/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java new file mode 100644 index 0000000..5b67d3e --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.zookeeper; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.JsonInstanceSerializer; +import org.apache.curator.x.discovery.details.ServiceCacheListener; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.common.util.ZKUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class KylinServerDiscovery implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscovery.class); + + public static final String SERVICE_PATH = "/service"; + public static final String SERVICE_NAME = "cluster_servers"; + public static final String SERVICE_PAYLOAD_DESCRIPTION = "description"; + + private static class SingletonHolder { + private static final KylinServerDiscovery INSTANCE = new KylinServerDiscovery(); + } + + public static KylinServerDiscovery getInstance() { + return SingletonHolder.INSTANCE; + } + + private final KylinConfig kylinConfig; + private final CuratorFramework curator; + private final ServiceDiscovery<LinkedHashMap> serviceDiscovery; + private final ServiceCache<LinkedHashMap> serviceCache; + + private KylinServerDiscovery() { + this(KylinConfig.getInstanceFromEnv()); + } + + @VisibleForTesting + protected KylinServerDiscovery(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + this.curator = ZKUtil.getZookeeperClient(kylinConfig); + try { + final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class); + serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curator) + .basePath(SERVICE_PATH).serializer(serializer).build(); + serviceDiscovery.start(); + + serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME) + .threadFactory( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("KylinServerTracker-%d").build()) + .build(); + + final AtomicBoolean isFinishInit = new AtomicBoolean(false); + serviceCache.addListener(new ServiceCacheListener() { + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + } + + @Override + public void cacheChanged() { + logger.info("Service discovery get cacheChanged notification"); + final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances(); + Map<String, String> instanceNodes = Maps.newHashMapWithExpectedSize(instances.size()); + for (ServiceInstance<LinkedHashMap> entry : instances) { + instanceNodes.put(entry.getAddress() + ":" + entry.getPort(), + (String) entry.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION)); + } + + logger.info("kylin.server.cluster-servers update to " + instanceNodes); + // update cluster servers + System.setProperty("kylin.server.cluster-servers", StringUtil.join(instanceNodes.keySet(), ",")); + + // get servers and its mode(query, job, all) + final String restServersInClusterWithMode = StringUtil.join(instanceNodes.entrySet().stream() + .map(input -> input.getKey() + ":" + input.getValue()).collect(Collectors.toList()), ","); + logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode); + System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode); + isFinishInit.set(true); + } + }); + serviceCache.start(); + + registerSelf(); + while (!isFinishInit.get()) { + logger.info("Haven't registered, waiting ..."); + Thread.sleep(100L); + } + } catch (Exception e) { + throw new RuntimeException("Fail to initialize due to ", e); + } + } + + private void registerSelf() throws Exception { + String hostAddr = kylinConfig.getServerRestAddress(); + String[] hostAddrInfo = hostAddr.split(":"); + if (hostAddrInfo.length < 2) { + logger.error("kylin.server.host-address {} is not qualified ", hostAddr); + throw new RuntimeException("kylin.server.host-address " + hostAddr + " is not qualified"); + } + String host = hostAddrInfo[0]; + int port = Integer.parseInt(hostAddrInfo[1]); + + String serverMode = kylinConfig.getServerMode(); + registerServer(host, port, serverMode); + } + + private void registerServer(String host, int port, String mode) throws Exception { + final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>(); + instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, mode); + + ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME) + .payload(instanceDetail).port(port).address(host).build(); + + for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) { + // Check for registered instances to avoid being double registered + if (instance.getAddress().equals(thisInstance.getAddress()) + && instance.getPort().equals(thisInstance.getPort())) { + serviceDiscovery.unregisterService(instance); + } + } + serviceDiscovery.registerService(thisInstance); + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(serviceCache); + IOUtils.closeQuietly(serviceDiscovery); + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java similarity index 61% rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java index 66e3832..9e9fe95 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java +++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java @@ -16,38 +16,28 @@ * limitations under the License. */ -package org.apache.kylin.job.impl.curator; +package org.apache.kylin.common.zookeeper; import java.io.Closeable; import java.io.IOException; -import org.apache.curator.framework.CuratorFramework; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ZKUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.SchedulerException; -import org.apache.kylin.job.lock.MockJobLock; /** */ public class ExampleServer implements Closeable { private String address; - private CuratorScheduler scheduler; + private KylinServerDiscovery discovery; - public ExampleServer(String address) throws Exception { + public ExampleServer(String address) { this.address = address; KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig); kylinConfig1.setProperty("kylin.server.host-address", address); - CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1); - scheduler = new CuratorScheduler(client); - scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } + discovery = new KylinServerDiscovery(kylinConfig1); } public String getAddress() { @@ -56,13 +46,7 @@ public class ExampleServer implements Closeable { @Override public void close() throws IOException { - - if (scheduler!= null) - try { - scheduler.shutdown(); - } catch (SchedulerException e) { - // - } + discovery.close(); } } diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java similarity index 71% rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java index 4cf1410..efcb2ef 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kylin.job.impl.curator; +package org.apache.kylin.common.zookeeper; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; - -import javax.annotation.Nullable; +import java.util.stream.Collectors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -30,9 +29,9 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.ZKUtil; -import org.apache.kylin.job.execution.ExecutableManager; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -40,19 +39,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.base.Function; import org.apache.kylin.shaded.com.google.common.collect.Lists; /** */ -public class CuratorSchedulerTest extends LocalFileMetadataTestCase { +public class KylinServerDiscoveryTest extends LocalFileMetadataTestCase { - private static final Logger logger = LoggerFactory.getLogger(CuratorSchedulerTest.class); + private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscoveryTest.class); private TestingServer zkTestServer; - protected ExecutableManager jobService; - @Before public void setup() throws Exception { zkTestServer = new TestingServer(); @@ -80,10 +76,8 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase { ServiceDiscovery<LinkedHashMap> serviceDiscovery = null; CuratorFramework curatorClient = null; try { - - final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> serializer = new CuratorScheduler.JsonInstanceSerializer<>( - LinkedHashMap.class); - String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH; + String servicePath = KylinServerDiscovery.SERVICE_PATH; + final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class); curatorClient = ZKUtil.newZookeeperClient(zkString, new ExponentialBackoffRetry(3000, 3)); serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient) .basePath(servicePath).serializer(serializer).build(); @@ -94,36 +88,32 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase { Collection<String> serviceNames = serviceDiscovery.queryForNames(); Assert.assertTrue(serviceNames.size() == 1); - Assert.assertTrue(CuratorScheduler.SERVICE_NAME.equals(serviceNames.iterator().next())); + Assert.assertTrue(KylinServerDiscovery.SERVICE_NAME.equals(serviceNames.iterator().next())); Collection<ServiceInstance<LinkedHashMap>> instances = serviceDiscovery - .queryForInstances(CuratorScheduler.SERVICE_NAME); + .queryForInstances(KylinServerDiscovery.SERVICE_NAME); Assert.assertTrue(instances.size() == 2); List<ServiceInstance<LinkedHashMap>> instancesList = Lists.newArrayList(instances); - final List<String> instanceNodes = Lists.transform(instancesList, - new Function<ServiceInstance<LinkedHashMap>, String>() { - - @Nullable - @Override - public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) { - return (String) stringServiceInstance.getPayload() - .get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION); - } - }); + final List<String> instanceNodes = instancesList.stream() + .map(input -> input.getAddress() + ":" + input.getPort() + ":" + + input.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION)) + .collect(Collectors.toList()); Assert.assertTrue(instanceNodes.contains(server1.getAddress() + ":query")); Assert.assertTrue(instanceNodes.contains(server2.getAddress() + ":query")); // stop one server server1.close(); - instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME); + instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME); + ServiceInstance<LinkedHashMap> existingInstance = instances.iterator().next(); Assert.assertTrue(instances.size() == 1); Assert.assertEquals(server2.getAddress() + ":query", - instances.iterator().next().getPayload().get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION)); + existingInstance.getAddress() + ":" + existingInstance.getPort() + ":" + + existingInstance.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION)); // all stop server2.close(); - instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME); + instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME); Assert.assertTrue(instances.size() == 0); } finally { diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java index faa7d71..7e829ef 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java @@ -21,27 +21,16 @@ package org.apache.kylin.job.impl.curator; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; -import java.util.LinkedHashMap; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.x.discovery.ServiceCache; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.InstanceSerializer; -import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.ServerMode; -import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.common.util.ZKUtil; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -54,9 +43,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.kylin.shaded.com.google.common.base.Function; -import org.apache.kylin.shaded.com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; public class CuratorScheduler implements Scheduler<AbstractExecutable> { @@ -64,15 +51,10 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { private boolean started = false; private CuratorFramework curatorClient = null; private static CuratorLeaderSelector jobClient = null; - private ServiceDiscovery<LinkedHashMap> serviceDiscovery = null; - private ServiceCache<LinkedHashMap> serviceCache = null; private KylinConfig kylinConfig; private AtomicInteger count = new AtomicInteger(); static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader"; - static final String KYLIN_SERVICE_PATH = "/service"; - static final String SERVICE_NAME = "kylin"; - static final String SERVICE_PAYLOAD_DESCRIPTION = "description"; // the default constructor should exist for reflection initialization public CuratorScheduler() { @@ -100,13 +82,7 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { curatorClient = ZKUtil.getZookeeperClient(kylinConfig); } - final String serverMode = jobEngineConfig.getConfig().getServerMode(); - final String restAddress = kylinConfig.getServerRestAddress(); - try { - registerInstance(restAddress, serverMode); - } catch (Exception e) { - throw new SchedulerException(e); - } + String restAddress = kylinConfig.getServerRestAddress(); String jobEnginePath = JOB_ENGINE_LEADER_PATH; @@ -120,79 +96,13 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { throw new SchedulerException(e); } } else { - logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler"); + logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode() + + ", no need to run job scheduler"); } started = true; } } - private void registerInstance(String restAddress, String mode) throws Exception { - final String host = restAddress.substring(0, restAddress.indexOf(":")); - final String port = restAddress.substring(restAddress.indexOf(":") + 1); - - final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class); - final String servicePath = KYLIN_SERVICE_PATH; - serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient) - .basePath(servicePath).serializer(serializer).build(); - serviceDiscovery.start(); - - serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME) - .threadFactory(Executors.defaultThreadFactory()).build(); - - serviceCache.addListener(new ServiceCacheListener() { - @Override - public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { - } - - @Override - public void cacheChanged() { - logger.info("Service discovery get cacheChanged notification"); - final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances(); - final List<String> instanceNodes = Lists.transform(instances, - new Function<ServiceInstance<LinkedHashMap>, String>() { - - @Nullable - @Override - public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) { - return (String) stringServiceInstance.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION); - } - }); - - final String restServersInCluster = // - StringUtil.join(instanceNodes.stream().map(input -> { // - String[] split = input.split(":"); // - return split[0] + ":" + split[1]; // - }).collect(Collectors.toList()), ","); // - - - logger.info("kylin.server.cluster-servers update to " + restServersInCluster); - // update cluster servers - System.setProperty("kylin.server.cluster-servers", restServersInCluster); - - // get servers and its mode(query, job, all) - final String restServersInClusterWithMode = StringUtil.join(instanceNodes, ","); - logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode); - System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode); - } - }); - serviceCache.start(); - - final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>(); - - instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, restAddress + ":" + mode); - ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME) - .payload(instanceDetail).port(Integer.valueOf(port)).address(host).build(); - - for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) { - // Check for registered instances to avoid being double registered - if (instance.getAddress().equals(thisInstance.getAddress()) - && instance.getPort().equals(thisInstance.getPort())) { - serviceDiscovery.unregisterService(instance); - } - } - serviceDiscovery.registerService(thisInstance); - } - private void monitorJobEngine() { logger.info("Start collect monitor ZK Participants"); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { @@ -220,8 +130,6 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { @Override public void shutdown() throws SchedulerException { - IOUtils.closeQuietly(serviceCache); - IOUtils.closeQuietly(serviceDiscovery); IOUtils.closeQuietly(curatorClient); IOUtils.closeQuietly(jobClient); started = false; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index d1ec4cf..2f42410 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.zookeeper.KylinServerDiscovery; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -134,6 +135,11 @@ public class JobService extends BasicService implements InitializingBean { final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory .scheduler(kylinConfig.getSchedulerType()); + if (kylinConfig.getServerSelfDiscoveryEnabled()) { + KylinServerDiscovery.getInstance(); + } + logger.info("Cluster servers: {}", Lists.newArrayList(kylinConfig.getRestServers())); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {