Repository: kylin Updated Branches: refs/heads/KYLIN-1826-2 [created] 3ac6490b3
KYLIN-1826 support mutiple hive env by extending KylinConfig and HiveClient Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ac6490b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ac6490b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ac6490b Branch: refs/heads/KYLIN-1826-2 Commit: 3ac6490b36e897c6ad853f0d1eba3f9c97c88382 Parents: 309593b Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Nov 9 16:23:19 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 9 16:23:19 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/metadata/project/ProjectInstance.java | 23 ++++- .../apache/kylin/source/hive/CLIHiveClient.java | 35 +++++++ .../source/hive/CreateFlatHiveTableStep.java | 2 +- .../kylin/source/hive/HiveClientFactory.java | 14 ++- .../kylin/source/hive/HiveCmdBuilder.java | 17 +++- .../apache/kylin/source/hive/HiveMRInput.java | 47 ++++++++-- .../source/hive/HiveSourceTableLoader.java | 2 +- .../apache/kylin/source/hive/HiveTableMeta.java | 96 ++++++++++++++++++++ 9 files changed, 227 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ee9f57c..9b3a592 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -830,4 +830,12 @@ abstract public class KylinConfigBase implements Serializable { public void setMaxBuildingSegments(int maxBuildingSegments) { setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments)); } + + public String getHiveHome() { + return getOptional("kylin.hive.home", ""); + } + + public void setHiveHome(String hiveHome) { + setProperty("kylin.hive.home", hiveHome); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 1afc603..f9d220f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -19,6 +19,7 @@ package org.apache.kylin.metadata.project; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -26,6 +27,8 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.realization.RealizationType; @@ -78,6 +81,11 @@ public class ProjectInstance extends RootPersistentEntity { @JsonProperty("ext_filters") private Set<String> extFilters = new TreeSet<String>(); + @JsonProperty("override_kylin_properties") + private LinkedHashMap<String, String> overrideKylinProps = new LinkedHashMap<>(); + + private KylinConfigExt config; + public String getResourcePath() { return concatResourcePath(name); } @@ -285,19 +293,30 @@ public class ProjectInstance extends RootPersistentEntity { } } + public KylinConfig getConfig() { + return config; + } + + private void setConfig(KylinConfigExt config) { + this.config = config; + } + + public void init() { if (name == null) name = ProjectInstance.DEFAULT_PROJECT_NAME; if (realizationEntries == null) { - realizationEntries = new ArrayList<RealizationEntry>(); + realizationEntries = new ArrayList<>(); } if (tables == null) - tables = new TreeSet<String>(); + tables = new TreeSet<>(); if (StringUtils.isBlank(this.name)) throw new IllegalStateException("Project name must not be blank"); + + this.config = KylinConfigExt.createInstance(config, overrideKylinProps); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index 5a17f1f..7538444 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -19,9 +19,12 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,6 +36,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.kylin.common.KylinConfig; import com.google.common.collect.Lists; @@ -46,11 +50,42 @@ public class CLIHiveClient implements IHiveClient { protected HiveConf hiveConf = null; protected Driver driver = null; protected HiveMetaStoreClient metaStoreClient = null; + private final static String LOCAL_FS_SCHEMA = "file://"; + public static final String HIVE_CONFIG_FILE_LOCATION = "conf/hive-site.xml"; + public static final String HIVE_COMMAND_LOCATION = "bin/hive"; public CLIHiveClient() { hiveConf = new HiveConf(CLIHiveClient.class); } + public CLIHiveClient(KylinConfig kylinConfig) { + + String hiveHome = kylinConfig.getHiveHome(); + if (StringUtils.isNotEmpty(hiveHome)) { + if (hiveHome.endsWith("/") == false) { + hiveHome = hiveHome + "/"; + } + String configFileLocation = hiveHome + HIVE_CONFIG_FILE_LOCATION; + URL uri = null; + try { + uri = new URL(LOCAL_FS_SCHEMA + configFileLocation); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Can not find hive config file " + configFileLocation); + } + /** + * In HiveConf, hiveSiteURL is a static variable, so we should use a global lock. + * If uri is null, HiveConf will use the file from java classpath. + */ + synchronized (CLIHiveClient.class) { + hiveConf.setHiveSiteLocation(uri); + hiveConf = new HiveConf(CLIHiveClient.class); + } + + } else { + hiveConf = new HiveConf(CLIHiveClient.class); + } + } + /** * only used by Deploy Util */ http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index 025fd94..6074d1b 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -40,7 +40,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { private final BufferedLogger stepLogger = new BufferedLogger(logger); private void createFlatHiveTable(KylinConfig config) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config); hiveCmdBuilder.addStatement(getInitStatement()); hiveCmdBuilder.addStatement(getCreateTableStatement()); final String cmd = hiveCmdBuilder.toString(); http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java index 8c883af..4f8755a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java @@ -21,13 +21,19 @@ package org.apache.kylin.source.hive; import org.apache.kylin.common.KylinConfig; public class HiveClientFactory { - public static IHiveClient getHiveClient() { - if ("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { - return new CLIHiveClient(); + + public static IHiveClient getHiveClientByConfig(KylinConfig kylinConfig) { + if ("cli".equals(kylinConfig.getHiveClientMode())) { + return new CLIHiveClient(kylinConfig); } else if ("beeline".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { - return new BeelineHiveClient(KylinConfig.getInstanceFromEnv().getHiveBeelineParams()); + return new BeelineHiveClient(kylinConfig.getHiveBeelineParams()); } else { throw new RuntimeException("cannot recognize hive client mode"); } } + + public static IHiveClient getHiveClient() { + return getHiveClientByConfig(KylinConfig.getInstanceFromEnv()); + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java index 844cf12..9707ac1 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,11 @@ public class HiveCmdBuilder { final private ArrayList<String> statements = Lists.newArrayList(); public HiveCmdBuilder() { - kylinConfig = KylinConfig.getInstanceFromEnv(); + this(KylinConfig.getInstanceFromEnv()); + } + + public HiveCmdBuilder(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; clientMode = HiveClientMode.valueOf(kylinConfig.getHiveClientMode().toUpperCase()); } @@ -53,7 +58,15 @@ public class HiveCmdBuilder { switch (clientMode) { case CLI: - buf.append("hive -e \""); + String hiveHome = kylinConfig.getHiveHome(); + if (StringUtils.isNotEmpty(hiveHome)) { + if (hiveHome.endsWith("/") == false) { + hiveHome = hiveHome + "/"; + } + buf.append(hiveHome).append(CLIHiveClient.HIVE_COMMAND_LOCATION).append(" -e \""); + } else { + buf.append("hive -e \""); + } for (String statement : statements) { buf.append(statement).append("\n"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 9e9dc25..1cee29c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -59,6 +59,8 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { + private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class); + @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { return new BatchCubingInputSide(flatDesc); @@ -133,10 +135,41 @@ public class HiveMRInput implements IMRInput { if (kylinConfig.isHiveRedistributeEnabled() == true) { jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); } + AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } + + if (StringUtils.isNotEmpty(kylinConfig.getHiveHome()) == true) { + // copy from another hive cluster to current cluster + task = createCopyHiveDataStep(kylinConfig, flatDesc, jobFlow.getId()); + if (task != null) { + jobFlow.addTask(task); + } + } + } + + protected AbstractExecutable createCopyHiveDataStep(KylinConfig kylinConfig, IJoinedFlatTableDesc flatDesc, String jobId) { + IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(kylinConfig); + String input = ""; + try { + input = hiveClient.getHiveTableMeta(kylinConfig.getHiveDatabaseForIntermediateTable(), flatDesc.getTableName()).getSdLocation(); + } catch (Exception e) { + logger.error("Error when get intermediate table location", e); + throw new IllegalArgumentException(e); + } + + if(input.startsWith("/") || input.startsWith(HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY))) { + // in the same cluster + return null; + } + String output = JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + String cmd = String.format("hadoop distcp -overwrite %s %s", input, output); + ShellExecutable task = new ShellExecutable(); + task.setName("Copy Intermediate Table To Local DFS"); + task.setCmd(cmd); + return task; } public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { @@ -158,9 +191,9 @@ public class HiveMRInput implements IMRInput { public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(kylinConfig); + MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig); final Set<TableDesc> lookupViewsTables = Sets.newHashSet(); @@ -237,13 +270,13 @@ public class HiveMRInput implements IMRInput { public static class RedistributeFlatHiveTableStep extends AbstractExecutable { private final BufferedLogger stepLogger = new BufferedLogger(logger); - private long computeRowCount(String database, String table) throws Exception { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + private long computeRowCount(KylinConfig config, String database, String table) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(config); return hiveClient.getHiveTableRows(database, table); } private void redistributeTable(KylinConfig config, int numReducers) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config); hiveCmdBuilder.addStatement(getInitStatement()); hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); @@ -280,7 +313,7 @@ public class HiveMRInput implements IMRInput { } try { - long rowCount = computeRowCount(database, tableName); + long rowCount = computeRowCount(config, database, tableName); logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); if (rowCount == 0) { if (!config.isEmptySegmentAllowed()) { @@ -358,7 +391,7 @@ public class HiveMRInput implements IMRInput { StringBuffer output = new StringBuffer(); final String hiveTable = this.getIntermediateTableIdentity(); if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config); hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 401e720..c5c7806 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -60,7 +60,7 @@ public class HiveSourceTableLoader { db2tables.put(parts[0], parts[1]); } - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(config); SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config)); for (Map.Entry<String, String> entry : db2tables.entries()) { SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java index 784a0bb..dcb79e1 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java @@ -66,6 +66,102 @@ class HiveTableMeta { this.partitionColumns = partitionColumns; } + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getSdLocation() { + return sdLocation; + } + + public void setSdLocation(String sdLocation) { + this.sdLocation = sdLocation; + } + + public String getSdInputFormat() { + return sdInputFormat; + } + + public void setSdInputFormat(String sdInputFormat) { + this.sdInputFormat = sdInputFormat; + } + + public String getSdOutputFormat() { + return sdOutputFormat; + } + + public void setSdOutputFormat(String sdOutputFormat) { + this.sdOutputFormat = sdOutputFormat; + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public String getTableType() { + return tableType; + } + + public void setTableType(String tableType) { + this.tableType = tableType; + } + + public int getLastAccessTime() { + return lastAccessTime; + } + + public void setLastAccessTime(int lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + public long getFileSize() { + return fileSize; + } + + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public long getFileNum() { + return fileNum; + } + + public void setFileNum(long fileNum) { + this.fileNum = fileNum; + } + + public boolean isNative() { + return isNative; + } + + public void setNative(boolean aNative) { + isNative = aNative; + } + + public List<HiveTableColumnMeta> getAllColumns() { + return allColumns; + } + + public void setAllColumns(List<HiveTableColumnMeta> allColumns) { + this.allColumns = allColumns; + } + + public List<HiveTableColumnMeta> getPartitionColumns() { + return partitionColumns; + } + + public void setPartitionColumns(List<HiveTableColumnMeta> partitionColumns) { + this.partitionColumns = partitionColumns; + } + @Override public String toString() { return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + ", partitionColumns=" + partitionColumns + '}';