This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new b72781c KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration b72781c is described below commit b72781c3d89e3c5fd5bc1f9363d1dc4839e3c3d2 Author: zhangxiang17 <zhangxian...@58.com> AuthorDate: Mon Nov 11 13:02:30 2019 +0800 KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration --- .../org/apache/kylin/common/KylinConfigBase.java | 32 ++++++++++++++++- .../org/apache/kylin/common}/util/ToolUtil.java | 38 +++++++++++++++++++- .../org/apache/kylin/common/util/ToolUtilTest.java | 41 ++++++++++++++++++++++ .../job/impl/threadpool/DistributedScheduler.java | 9 ++++- .../apache/kylin/tool/AbstractInfoExtractor.java | 2 +- .../org/apache/kylin/tool/DiagnosisInfoCLI.java | 2 +- .../org/apache/kylin/tool/JobDiagnosisInfoCLI.java | 2 +- .../org/apache/kylin/tool/KylinLogExtractor.java | 2 +- 8 files changed, 121 insertions(+), 7 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 91ac9ff..73b8f01 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -731,6 +731,36 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic", "23")); } + /** + * get assigned server array, which a empty string array in default + * @return + */ + public String[] getAssignedServers() { + return getOptionalStringArray("kylin.cube.schedule.assigned-servers", new String[] {}); + } + + /** + * Determine if the target node is in the assigned node + * @param targetServers target task servers + * @return + */ + public boolean isOnAssignedServer(String... targetServers) { + + String[] servers = this.getAssignedServers(); + if (null == servers || servers.length == 0) { + return true; + } + + for (String s : servers) { + for (String ts : targetServers) { + if (s.equalsIgnoreCase(ts)) { + return true; + } + } + } + return false; + } + // ============================================================================ // JOB // ============================================================================ @@ -1526,7 +1556,7 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false")); } - public boolean isSparkCardinalityEnabled(){ + public boolean isSparkCardinalityEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false")); } diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java similarity index 71% rename from tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java rename to core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java index 842beb2..35075a6 100644 --- a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java @@ -17,7 +17,7 @@ * */ -package org.apache.kylin.tool.util; +package org.apache.kylin.common.util; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; @@ -26,8 +26,13 @@ import org.apache.kylin.common.persistence.ResourceStore; import java.io.File; import java.io.IOException; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; import java.net.UnknownHostException; +import java.util.Enumeration; import java.util.Map; public class ToolUtil { @@ -90,4 +95,35 @@ public class ToolUtil { } return hostname; } + + public static InetAddress getFirstNonLoopbackAddress(boolean preferIpv4, boolean preferIPv6) + throws SocketException { + Enumeration en = NetworkInterface.getNetworkInterfaces(); + while (en.hasMoreElements()) { + NetworkInterface element = (NetworkInterface) en.nextElement(); + for (Enumeration en2 = element.getInetAddresses(); en2.hasMoreElements();) { + InetAddress addr = (InetAddress) en2.nextElement(); + if (!addr.isLoopbackAddress()) { + if (addr instanceof Inet4Address) { + if (preferIPv6) { + continue; + } + return addr; + } + if (addr instanceof Inet6Address) { + if (preferIpv4) { + continue; + } + return addr; + } + } + } + } + return null; + } + + public static InetAddress getFirstIPV4NonLoopBackAddress() throws SocketException { + return getFirstNonLoopbackAddress(true, false); + } + } diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java new file mode 100644 index 0000000..8161d43 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.kylin.common.util; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; + +import org.junit.Assert; +import org.junit.Test; + +public class ToolUtilTest { + + @Test + public void testGetFirstIPV4NonLoopBackAddress() throws Exception { + + InetAddress localIp = ToolUtil.getFirstIPV4NonLoopBackAddress(); + boolean isLoopBackIp = localIp.isLoopbackAddress(); + Assert.assertNotNull("localIp is null", localIp); + Assert.assertEquals(false, isLoopBackIp); + Assert.assertEquals(true, localIp instanceof Inet4Address); + Assert.assertEquals(false, localIp instanceof Inet6Address); + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 8ea1533..4df9221 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.common.util.ToolUtil; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.ExecuteException; @@ -102,7 +103,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> { public void run() { try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s", System.identityHashCode(DistributedScheduler.this), executable.getId())) { - if (jobLock.lock(getLockPath(executable.getId()))) { + + KylinConfig config = executable.getCubeSpecificConfig(); + boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(), + ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress()); + logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId() + + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName()); + if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) { logger.info(executable.toString() + " scheduled in server: " + serverName); context.addRunningJob(executable); diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java index 994e4d6..f55e11d 100644 --- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java @@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.ZipFileUtils; -import org.apache.kylin.tool.util.ToolUtil; +import org.apache.kylin.common.util.ToolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java index 9063e9e..5d85308 100644 --- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java @@ -36,7 +36,7 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.tool.util.ToolUtil; +import org.apache.kylin.common.util.ToolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java index 8fec48e..e71c5f6 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java @@ -36,7 +36,7 @@ import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.tool.util.ToolUtil; +import org.apache.kylin.common.util.ToolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java index a84345b..cb9fcd0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java @@ -32,7 +32,7 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.tool.util.ToolUtil; +import org.apache.kylin.common.util.ToolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory;