This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c5d1a398e355631f3135d2a1a47661dfeb13144f
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Mon May 18 09:48:49 2020 +0800

    KYLIN-4499 Extract kylin server self discovery service from CuratorScheduler
---
 core-common/pom.xml                                |  11 ++
 .../java/org/apache/kylin/common/KConstants.java   |  23 +++
 .../java/org/apache/kylin/common/KylinConfig.java  |  14 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../common/zookeeper/KylinServerDiscovery.java     | 164 +++++++++++++++++++++
 .../kylin/common/zookeeper}/ExampleServer.java     |  26 +---
 .../common/zookeeper/KylinServerDiscoveryTest.java |  46 +++---
 .../kylin/job/impl/curator/CuratorScheduler.java   | 100 +------------
 .../org/apache/kylin/rest/service/JobService.java  |   6 +
 9 files changed, 247 insertions(+), 147 deletions(-)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index fe3af13..407ab9c 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -50,6 +50,12 @@
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <!-- this jar was absent from hbase lib, so compile it  -->
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
             <groupId>org.freemarker</groupId>
             <artifactId>freemarker</artifactId>
         </dependency>
@@ -104,6 +110,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-shaded-guava</artifactId>
         </dependency>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KConstants.java 
b/core-common/src/main/java/org/apache/kylin/common/KConstants.java
new file mode 100644
index 0000000..5e1723c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+public class KConstants {
+    public static final int DEFAULT_SERVICE_PORT = 7070;
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 6297bd1..7b0888b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.common;
 
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.restclient.RestClient;
@@ -48,6 +47,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+
 /**
  */
 public class KylinConfig extends KylinConfigBase {
@@ -525,7 +527,15 @@ public class KylinConfig extends KylinConfigBase {
             String value = entry.getValue().toString();
             orderedProperties.setProperty(key, value);
         }
-
+        // Reset some properties which might be overriden by system properties
+        String[] systemProps = { "kylin.server.cluster-servers", 
"kylin.server.cluster-servers-with-mode" };
+        for (String sysProp : systemProps) {
+            String sysPropValue = System.getProperty(sysProp);
+            if (!Strings.isNullOrEmpty(sysPropValue)) {
+                orderedProperties.setProperty(sysProp, sysPropValue);
+            }
+        }
+        
         final StringBuilder sb = new StringBuilder();
         for (Map.Entry<String, String> entry : orderedProperties.entrySet()) {
             sb.append(entry.getKey() + "=" + entry.getValue()).append('\n');
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ff6138a..62021a9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2140,6 +2140,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.server.host-address", "localhost:7070");
     }
 
+    public boolean getServerSelfDiscoveryEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.server.self-discovery-enabled", FALSE));
+    }
+
     public String getClusterName() {
         String key = "kylin.server.cluster-name";
         String clusterName = this.getOptional(key, getMetadataUrlPrefix());
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java
 
b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java
new file mode 100644
index 0000000..5b67d3e
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.zookeeper;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ZKUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class KylinServerDiscovery implements Closeable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KylinServerDiscovery.class);
+
+    public static final String SERVICE_PATH = "/service";
+    public static final String SERVICE_NAME = "cluster_servers";
+    public static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
+
+    private static class SingletonHolder {
+        private static final KylinServerDiscovery INSTANCE = new 
KylinServerDiscovery();
+    }
+
+    public static KylinServerDiscovery getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    private final KylinConfig kylinConfig;
+    private final CuratorFramework curator;
+    private final ServiceDiscovery<LinkedHashMap> serviceDiscovery;
+    private final ServiceCache<LinkedHashMap> serviceCache;
+
+    private KylinServerDiscovery() {
+        this(KylinConfig.getInstanceFromEnv());
+    }
+
+    @VisibleForTesting
+    protected KylinServerDiscovery(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.curator = ZKUtil.getZookeeperClient(kylinConfig);
+        try {
+            final JsonInstanceSerializer<LinkedHashMap> serializer = new 
JsonInstanceSerializer<>(LinkedHashMap.class);
+            serviceDiscovery = 
ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curator)
+                    .basePath(SERVICE_PATH).serializer(serializer).build();
+            serviceDiscovery.start();
+
+            serviceCache = 
serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME)
+                    .threadFactory(
+                            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("KylinServerTracker-%d").build())
+                    .build();
+
+            final AtomicBoolean isFinishInit = new AtomicBoolean(false);
+            serviceCache.addListener(new ServiceCacheListener() {
+                @Override
+                public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
+                }
+
+                @Override
+                public void cacheChanged() {
+                    logger.info("Service discovery get cacheChanged 
notification");
+                    final List<ServiceInstance<LinkedHashMap>> instances = 
serviceCache.getInstances();
+                    Map<String, String> instanceNodes = 
Maps.newHashMapWithExpectedSize(instances.size());
+                    for (ServiceInstance<LinkedHashMap> entry : instances) {
+                        instanceNodes.put(entry.getAddress() + ":" + 
entry.getPort(),
+                                (String) 
entry.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION));
+                    }
+
+                    logger.info("kylin.server.cluster-servers update to " + 
instanceNodes);
+                    // update cluster servers
+                    System.setProperty("kylin.server.cluster-servers", 
StringUtil.join(instanceNodes.keySet(), ","));
+
+                    // get servers and its mode(query, job, all)
+                    final String restServersInClusterWithMode = 
StringUtil.join(instanceNodes.entrySet().stream()
+                            .map(input -> input.getKey() + ":" + 
input.getValue()).collect(Collectors.toList()), ",");
+                    logger.info("kylin.server.cluster-servers-with-mode update 
to " + restServersInClusterWithMode);
+                    
System.setProperty("kylin.server.cluster-servers-with-mode", 
restServersInClusterWithMode);
+                    isFinishInit.set(true);
+                }
+            });
+            serviceCache.start();
+
+            registerSelf();
+            while (!isFinishInit.get()) {
+                logger.info("Haven't registered, waiting ...");
+                Thread.sleep(100L);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to initialize due to ", e);
+        }
+    }
+
+    private void registerSelf() throws Exception {
+        String hostAddr = kylinConfig.getServerRestAddress();
+        String[] hostAddrInfo = hostAddr.split(":");
+        if (hostAddrInfo.length < 2) {
+            logger.error("kylin.server.host-address {} is not qualified ", 
hostAddr);
+            throw new RuntimeException("kylin.server.host-address " + hostAddr 
+ " is not qualified");
+        }
+        String host = hostAddrInfo[0];
+        int port = Integer.parseInt(hostAddrInfo[1]);
+
+        String serverMode = kylinConfig.getServerMode();
+        registerServer(host, port, serverMode);
+    }
+
+    private void registerServer(String host, int port, String mode) throws 
Exception {
+        final LinkedHashMap<String, String> instanceDetail = new 
LinkedHashMap<>();
+        instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, mode);
+
+        ServiceInstance<LinkedHashMap> thisInstance = 
ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME)
+                .payload(instanceDetail).port(port).address(host).build();
+
+        for (ServiceInstance<LinkedHashMap> instance : 
serviceCache.getInstances()) {
+            // Check for registered instances to avoid being double registered
+            if (instance.getAddress().equals(thisInstance.getAddress())
+                    && instance.getPort().equals(thisInstance.getPort())) {
+                serviceDiscovery.unregisterService(instance);
+            }
+        }
+        serviceDiscovery.registerService(thisInstance);
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(serviceCache);
+        IOUtils.closeQuietly(serviceDiscovery);
+    }
+}
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java 
b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
similarity index 61%
rename from 
core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
rename to 
core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
index 66e3832..9e9fe95 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
@@ -16,38 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.job.impl.curator;
+package org.apache.kylin.common.zookeeper;
 
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ZKUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.lock.MockJobLock;
 
 /**
  */
 public class ExampleServer implements Closeable {
 
     private String address;
-    private CuratorScheduler scheduler;
+    private KylinServerDiscovery discovery;
 
-    public ExampleServer(String address) throws Exception {
+    public ExampleServer(String address) {
         this.address = address;
 
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig);
         kylinConfig1.setProperty("kylin.server.host-address", address);
 
-        CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1);
-        scheduler = new CuratorScheduler(client);
-        scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
+        discovery = new KylinServerDiscovery(kylinConfig1);
     }
 
     public String getAddress() {
@@ -56,13 +46,7 @@ public class ExampleServer implements Closeable {
 
     @Override
     public void close() throws IOException {
-
-        if (scheduler!= null)
-            try {
-                scheduler.shutdown();
-            } catch (SchedulerException e) {
-               //
-            }
+        discovery.close();
     }
 
 }
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
similarity index 71%
rename from 
core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
rename to 
core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
index 4cf1410..efcb2ef 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
@@ -15,13 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.job.impl.curator;
+package org.apache.kylin.common.zookeeper;
 
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
-
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -30,9 +29,9 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.ZKUtil;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,19 +39,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.base.Function;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
 /**
  */
-public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
+public class KylinServerDiscoveryTest extends LocalFileMetadataTestCase {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(CuratorSchedulerTest.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(KylinServerDiscoveryTest.class);
 
     private TestingServer zkTestServer;
 
-    protected ExecutableManager jobService;
-
     @Before
     public void setup() throws Exception {
         zkTestServer = new TestingServer();
@@ -80,10 +76,8 @@ public class CuratorSchedulerTest extends 
LocalFileMetadataTestCase {
         ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
         CuratorFramework curatorClient = null;
         try {
-
-            final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> 
serializer = new CuratorScheduler.JsonInstanceSerializer<>(
-                    LinkedHashMap.class);
-            String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH;
+            String servicePath = KylinServerDiscovery.SERVICE_PATH;
+            final JsonInstanceSerializer<LinkedHashMap> serializer = new 
JsonInstanceSerializer<>(LinkedHashMap.class);
             curatorClient = ZKUtil.newZookeeperClient(zkString, new 
ExponentialBackoffRetry(3000, 3));
             serviceDiscovery = 
ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
                     .basePath(servicePath).serializer(serializer).build();
@@ -94,36 +88,32 @@ public class CuratorSchedulerTest extends 
LocalFileMetadataTestCase {
 
             Collection<String> serviceNames = serviceDiscovery.queryForNames();
             Assert.assertTrue(serviceNames.size() == 1);
-            
Assert.assertTrue(CuratorScheduler.SERVICE_NAME.equals(serviceNames.iterator().next()));
+            
Assert.assertTrue(KylinServerDiscovery.SERVICE_NAME.equals(serviceNames.iterator().next()));
             Collection<ServiceInstance<LinkedHashMap>> instances = 
serviceDiscovery
-                    .queryForInstances(CuratorScheduler.SERVICE_NAME);
+                    .queryForInstances(KylinServerDiscovery.SERVICE_NAME);
             Assert.assertTrue(instances.size() == 2);
             List<ServiceInstance<LinkedHashMap>> instancesList = 
Lists.newArrayList(instances);
 
-            final List<String> instanceNodes = Lists.transform(instancesList,
-                    new Function<ServiceInstance<LinkedHashMap>, String>() {
-
-                        @Nullable
-                        @Override
-                        public String apply(@Nullable 
ServiceInstance<LinkedHashMap> stringServiceInstance) {
-                            return (String) stringServiceInstance.getPayload()
-                                    
.get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION);
-                        }
-                    });
+            final List<String> instanceNodes = instancesList.stream()
+                    .map(input -> input.getAddress() + ":" + input.getPort() + 
":"
+                            + 
input.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION))
+                    .collect(Collectors.toList());
 
             Assert.assertTrue(instanceNodes.contains(server1.getAddress() + 
":query"));
             Assert.assertTrue(instanceNodes.contains(server2.getAddress() + 
":query"));
 
             // stop one server
             server1.close();
-            instances = 
serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME);
+            instances = 
serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME);
+            ServiceInstance<LinkedHashMap> existingInstance = 
instances.iterator().next();
             Assert.assertTrue(instances.size() == 1);
             Assert.assertEquals(server2.getAddress() + ":query",
-                    
instances.iterator().next().getPayload().get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION));
+                    existingInstance.getAddress() + ":" + 
existingInstance.getPort() + ":"
+                            + 
existingInstance.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION));
 
             // all stop
             server2.close();
-            instances = 
serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME);
+            instances = 
serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME);
             Assert.assertTrue(instances.size() == 0);
 
         } finally {
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
index faa7d71..7e829ef 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
@@ -21,27 +21,16 @@ package org.apache.kylin.job.impl.curator;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nullable;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.x.discovery.ServiceCache;
-import org.apache.curator.x.discovery.ServiceDiscovery;
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.ServerMode;
-import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -54,9 +43,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.kylin.shaded.com.google.common.base.Function;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import com.google.common.annotations.VisibleForTesting;
 
 public class CuratorScheduler implements Scheduler<AbstractExecutable> {
 
@@ -64,15 +51,10 @@ public class CuratorScheduler implements 
Scheduler<AbstractExecutable> {
     private boolean started = false;
     private CuratorFramework curatorClient = null;
     private static CuratorLeaderSelector jobClient = null;
-    private ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
-    private ServiceCache<LinkedHashMap> serviceCache = null;
     private KylinConfig kylinConfig;
     private AtomicInteger count = new AtomicInteger();
 
     static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader";
-    static final String KYLIN_SERVICE_PATH = "/service";
-    static final String SERVICE_NAME = "kylin";
-    static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
 
     // the default constructor should exist for reflection initialization
     public CuratorScheduler() {
@@ -100,13 +82,7 @@ public class CuratorScheduler implements 
Scheduler<AbstractExecutable> {
                 curatorClient = ZKUtil.getZookeeperClient(kylinConfig);
             }
 
-            final String serverMode = 
jobEngineConfig.getConfig().getServerMode();
-            final String restAddress = kylinConfig.getServerRestAddress();
-            try {
-                registerInstance(restAddress, serverMode);
-            } catch (Exception e) {
-                throw new SchedulerException(e);
-            }
+            String restAddress = kylinConfig.getServerRestAddress();
 
             String jobEnginePath = JOB_ENGINE_LEADER_PATH;
 
@@ -120,79 +96,13 @@ public class CuratorScheduler implements 
Scheduler<AbstractExecutable> {
                     throw new SchedulerException(e);
                 }
             } else {
-                logger.info("server mode: " + 
jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler");
+                logger.info("server mode: " + 
jobEngineConfig.getConfig().getServerMode()
+                        + ", no need to run job scheduler");
             }
             started = true;
         }
     }
 
-    private void registerInstance(String restAddress, String mode) throws 
Exception {
-        final String host = restAddress.substring(0, restAddress.indexOf(":"));
-        final String port = restAddress.substring(restAddress.indexOf(":") + 
1);
-
-        final JsonInstanceSerializer<LinkedHashMap> serializer = new 
JsonInstanceSerializer<>(LinkedHashMap.class);
-        final String servicePath = KYLIN_SERVICE_PATH;
-        serviceDiscovery = 
ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
-                .basePath(servicePath).serializer(serializer).build();
-        serviceDiscovery.start();
-
-        serviceCache = 
serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME)
-                .threadFactory(Executors.defaultThreadFactory()).build();
-
-        serviceCache.addListener(new ServiceCacheListener() {
-            @Override
-            public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
-            }
-
-            @Override
-            public void cacheChanged() {
-                logger.info("Service discovery get cacheChanged notification");
-                final List<ServiceInstance<LinkedHashMap>> instances = 
serviceCache.getInstances();
-                final List<String> instanceNodes = Lists.transform(instances,
-                        new Function<ServiceInstance<LinkedHashMap>, String>() 
{
-
-                            @Nullable
-                            @Override
-                            public String apply(@Nullable 
ServiceInstance<LinkedHashMap> stringServiceInstance) {
-                                return (String) 
stringServiceInstance.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION);
-                            }
-                        });
-
-                final String restServersInCluster = //
-                        StringUtil.join(instanceNodes.stream().map(input -> { 
//
-                            String[] split = input.split(":"); //
-                            return split[0] + ":" + split[1]; //
-                        }).collect(Collectors.toList()), ","); //
-
-
-                logger.info("kylin.server.cluster-servers update to " + 
restServersInCluster);
-                // update cluster servers
-                System.setProperty("kylin.server.cluster-servers", 
restServersInCluster);
-
-                // get servers and its mode(query, job, all)
-                final String restServersInClusterWithMode = 
StringUtil.join(instanceNodes, ",");
-                logger.info("kylin.server.cluster-servers-with-mode update to 
" + restServersInClusterWithMode);
-                System.setProperty("kylin.server.cluster-servers-with-mode", 
restServersInClusterWithMode);
-            }
-        });
-        serviceCache.start();
-
-        final LinkedHashMap<String, String> instanceDetail = new 
LinkedHashMap<>();
-
-        instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, restAddress + ":" + 
mode);
-        ServiceInstance<LinkedHashMap> thisInstance = 
ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME)
-                
.payload(instanceDetail).port(Integer.valueOf(port)).address(host).build();
-
-        for (ServiceInstance<LinkedHashMap> instance : 
serviceCache.getInstances()) {
-            // Check for registered instances to avoid being double registered
-            if (instance.getAddress().equals(thisInstance.getAddress())
-                    && instance.getPort().equals(thisInstance.getPort())) {
-                serviceDiscovery.unregisterService(instance);
-            }
-        }
-        serviceDiscovery.registerService(thisInstance);
-    }
-
     private void monitorJobEngine() {
         logger.info("Start collect monitor ZK Participants");
         Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new 
Runnable() {
@@ -220,8 +130,6 @@ public class CuratorScheduler implements 
Scheduler<AbstractExecutable> {
 
     @Override
     public void shutdown() throws SchedulerException {
-        IOUtils.closeQuietly(serviceCache);
-        IOUtils.closeQuietly(serviceDiscovery);
         IOUtils.closeQuietly(curatorClient);
         IOUtils.closeQuietly(jobClient);
         started = false;
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index d1ec4cf..2f42410 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.zookeeper.KylinServerDiscovery;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -134,6 +135,11 @@ public class JobService extends BasicService implements 
InitializingBean {
         final Scheduler<AbstractExecutable> scheduler = 
(Scheduler<AbstractExecutable>) SchedulerFactory
                 .scheduler(kylinConfig.getSchedulerType());
 
+        if (kylinConfig.getServerSelfDiscoveryEnabled()) {
+            KylinServerDiscovery.getInstance();
+        }
+        logger.info("Cluster servers: {}", 
Lists.newArrayList(kylinConfig.getRestServers()));
+        
         scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
 
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

Reply via email to