This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new ae6120b KYLIN-4224 Create flat table wich spark sql ae6120b is described below commit ae6120b849c499956a11814d817fb9506e6dd2d6 Author: weibin0516 <codingfor...@126.com> AuthorDate: Sun Nov 17 13:15:10 2019 +0800 KYLIN-4224 Create flat table wich spark sql --- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/common/BatchConstants.java | 3 + .../kylin/engine/spark/SparkCreatingFlatTable.java | 43 +++++++ .../apache/kylin/engine/spark/SparkExecutable.java | 7 +- .../apache/kylin/engine/spark/SparkSqlBatch.java | 131 +++++++++++++++++++++ .../apache/kylin/source/hive/HiveInputBase.java | 73 +++++++++++- webapp/app/partials/jobs/job_steps.html | 2 +- 7 files changed, 254 insertions(+), 6 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 2d16cdc..73c5ecc 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -42,6 +42,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; public static final String STEP_NAME_BUILD_SPARK_UHC_DICTIONARY = "Build UHC Dictionary with spark"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK = "Create Intermediate Flat Table With Spark"; public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index d20de2c..3fffad2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -109,6 +109,9 @@ public interface BatchConstants { String ARG_HBASE_CONF_PATH = "hbaseConfPath"; String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath"; String ARG_COUNTER_OUTPUT = "counterOutput"; + String ARG_BASE64_ENCODED_STEP_NAME = "base64StepName"; + String ARG_SQL_COUNT = "sqlCount"; + String ARG_BASE64_ENCODED_SQL = "base64EncodedSql"; /** * logger and counter diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java new file mode 100644 index 0000000..2bddd16 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.kylin.engine.mr.common.BatchConstants; + +public class SparkCreatingFlatTable extends SparkSqlBatch { + public static final int SQL_COUNT = 5; + + public SparkCreatingFlatTable() { + super(); + + for (int i = 0; i < SQL_COUNT; i++) { + getOptions().addOption(getSqlOption(i)); + } + } + + public static Option getSqlOption(int index) { + return OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT + String.valueOf(index)) + .hasArg() + .isRequired(true) + .withDescription("Sql0") + .create(BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(index)); + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index c2045c0..1eeab04 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -246,7 +246,8 @@ public class SparkExecutable extends AbstractExecutable { if (StringUtils.isEmpty(jars)) { jars = jobJar; } - if (cube != null) { + + if (cube != null && !isCreateFlatTable()) { setAlgorithmLayer(); String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); CubeSegment segment = cube.getSegmentById(segmentID); @@ -535,4 +536,8 @@ public class SparkExecutable extends AbstractExecutable { info.put(saveAsNames[i].trim(), counter); } } + + private boolean isCreateFlatTable() { + return ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK.equals(getName()); + } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java new file mode 100644 index 0000000..1881dc8 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.engine.mr.common.BatchConstants; + +import org.apache.spark.sql.SparkSession; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +/** + * Execute a batch of spark sql in order, if a sql execution fails, abort and throw an exception, + * no longer execute the left sqls. + */ +public class SparkSqlBatch extends AbstractApplication implements Serializable { + private final Logger logger = LoggerFactory.getLogger(SparkSqlBatch.class); + private Options options; + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME) + .hasArg() + .isRequired(true) + .withDescription("Cube Name") + .create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_STEP_NAME = OptionBuilder.withArgName(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME) + .hasArg() + .isRequired(true) + .withDescription("Step Name") + .create(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID) + .hasArg() + .isRequired(true) + .withDescription("Cube Segment Id") + .create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_SQL_COUNT = OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT) + .hasArg() + .isRequired(true) + .withDescription("Sql count") + .create(BatchConstants.ARG_SQL_COUNT); + + public SparkSqlBatch() { + options = new Options(); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_STEP_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_SQL_COUNT); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String stepName = base64Decode(optionsHelper.getOptionValue(OPTION_STEP_NAME)); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String sqlCountStr = optionsHelper.getOptionValue(OPTION_SQL_COUNT); + logger.info("start execute sql batch job, cubeName: " + cubeName + ", stepName: " + + stepName + ", segmentId: " + segmentId + ", sqlCount: " + sqlCountStr); + + int sqlCount = Integer.valueOf(sqlCountStr); + if (sqlCount <= 0) { + throw new RuntimeException("Count of sqls to execute must be greater than 0, " + + "in fact is " + sqlCountStr); + } + + SparkSession sparkSession = getSparkSession(stepName + " for cube: " + + cubeName + ", segment " + segmentId); + for (int i = 0; i < sqlCount; i++) { + String argName = BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(i); + Option optionSqlText = OptionBuilder.withArgName(argName) + .hasArg() + .isRequired(true) + .create(argName); + String encodedSql = optionsHelper.getOptionValue(optionSqlText); + String sqlText = base64Decode(encodedSql).trim(); + if (null != sqlText && sqlText.endsWith(";")) { + sqlText = sqlText.substring(0, sqlText.length() - 1); + } + logger.info("execute spark sql: " + sqlText); + if (i == sqlCount - 1) { + sparkSession.sql(sqlText).count(); + } else { + sparkSession.sql(sqlText); + } + } + } + + private SparkSession getSparkSession(String appName) { + return SparkSession.builder() + .appName(appName) + .enableHiveSupport() + .getOrCreate(); + } + + private String base64Decode(String str) throws UnsupportedEncodingException { + return new String( + Base64.getDecoder().decode(str.getBytes(StandardCharsets.UTF_8)), + StandardCharsets.UTF_8 + ); + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 469722a..881c8d5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -19,14 +19,16 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; +import java.util.Base64; import java.util.Objects; import java.util.Set; +import java.util.Locale; +import java.util.Collections; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +43,10 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.IInput; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.SparkCreatingFlatTable; +import org.apache.kylin.engine.spark.SparkExecutable; +import org.apache.kylin.engine.spark.SparkExecutableFactory; +import org.apache.kylin.engine.spark.SparkSqlBatch; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; @@ -254,8 +260,14 @@ public class HiveInputBase { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - if (kylinConfig.isLivyEnabled() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) { - jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); + if (cubeInstance.getEngineType() == IEngineAware.ID_SPARK) { + if (kylinConfig.isLivyEnabled()) { + jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, + jobWorkingDir, cubeName, flatDesc)); + } else { + jobFlow.addTask(createFlatHiveTableBySparkSql(hiveInitStatements, + jobWorkingDir, cubeName, flatDesc)); + } } else { jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); } @@ -342,6 +354,59 @@ public class HiveInputBase { return step; } + protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements, + String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) { + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, + jobWorkingDir); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); + + KylinConfig config = flatDesc.getSegment().getConfig(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(config); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK); + sparkExecutable.setClassName(SparkCreatingFlatTable.class.getName()); + + sparkExecutable.setParam(SparkSqlBatch.OPTION_CUBE_NAME.getOpt(), cubeName); + sparkExecutable.setParam(SparkSqlBatch.OPTION_STEP_NAME.getOpt(), + base64EncodeStr(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK)); + sparkExecutable.setParam(SparkSqlBatch.OPTION_SEGMENT_ID.getOpt(), + flatDesc.getSegment().getName()); + sparkExecutable.setParam(SparkSqlBatch.OPTION_SQL_COUNT.getOpt(), + String.valueOf(SparkCreatingFlatTable.SQL_COUNT)); + + sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(0).getOpt(), + base64EncodeStr(hiveInitStatements)); + sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(1).getOpt(), + base64EncodeStr(dropTableHql)); + + // createTableHql include create table sql and alter table sql + String[] sqlArr = createTableHql.trim().split(";"); + if (2 != sqlArr.length) { + throw new RuntimeException("create table hql should combined by a create table sql " + + "and a alter sql, but got: " + createTableHql); + } + sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(2).getOpt(), + base64EncodeStr(sqlArr[0])); + sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(3).getOpt(), + base64EncodeStr(sqlArr[1])); + + sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(4).getOpt(), + base64EncodeStr(insertDataHqls)); + + StringBuilder jars = new StringBuilder(); + StringUtil.appendWithSeparator(jars, config.getSparkAdditionalJars()); + sparkExecutable.setJars(jars.toString()); + + return sparkExecutable; + } + + private static String base64EncodeStr(String str) { + return new String( + Base64.getEncoder().encode(str.getBytes(StandardCharsets.UTF_8)), + StandardCharsets.UTF_8 + ); + } + protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); diff --git a/webapp/app/partials/jobs/job_steps.html b/webapp/app/partials/jobs/job_steps.html index e26c4c3..993b302 100644 --- a/webapp/app/partials/jobs/job_steps.html +++ b/webapp/app/partials/jobs/job_steps.html @@ -134,7 +134,7 @@ <a ng-if="step.info.yarn_application_tracking_url" href="{{step.info.yarn_application_tracking_url}}" target="_blank" - tooltip="MRJob"> + tooltip="Job"> <i class="ace-icon fa fa-tasks yellow bigger-110"></i> </a> <a ng-if="config.reference_links && config.reference_links['diagnostic'].link" href="{{config.reference_links['diagnostic'].link + step.info.mr_job_id}}" target="_blank"