KYLIN-2811, refine spark cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2d939a59 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2d939a59 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2d939a59 Branch: refs/heads/ranger Commit: 2d939a59fdf385bb20b4724e8a6f87879e26bdd7 Parents: ecf4819 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Tue Sep 5 19:10:47 2017 +0800 Committer: Roger Shi <rogershijich...@gmail.com> Committed: Tue Sep 5 21:14:02 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 23 ++--- .../engine/mr/common/AbstractHadoopJob.java | 26 ++++++ .../kylin/engine/mr/common/BatchConstants.java | 1 + .../spark/SparkBatchCubingJobBuilder2.java | 10 +-- .../kylin/engine/spark/SparkCubingByLayer.java | 95 +++++++++----------- .../kylin/engine/spark/SparkExecutable.java | 30 +++++-- 6 files changed, 108 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index a003638..c7c7f60 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -18,15 +18,6 @@ package org.apache.kylin.common; -import com.google.common.base.Preconditions; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.restclient.RestClient; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.OrderedProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -41,6 +32,16 @@ import java.nio.charset.Charset; import java.util.Map; import java.util.Properties; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OrderedProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + /** */ public class KylinConfig extends KylinConfigBase { @@ -58,7 +59,7 @@ public class KylinConfig extends KylinConfigBase { // thread-local instances, will override SYS_ENV_INSTANCE private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new ThreadLocal<>(); - + static { /* * Make Calcite to work with Unicode. @@ -226,7 +227,7 @@ public class KylinConfig extends KylinConfigBase { } } - private static Properties streamToProps(InputStream is) throws IOException { + public static Properties streamToProps(InputStream is) throws IOException { Properties prop = new Properties(); prop.load(is); IOUtils.closeQuietly(is); http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 081ac93..292c57d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -27,10 +27,12 @@ import static org.apache.hadoop.util.StringUtils.formatTime; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -464,6 +466,30 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } + public static KylinConfig loadKylinConfigFromHdfs(String uri) { + if (uri == null) + throw new IllegalArgumentException("meta url should not be null"); + + if (!uri.contains("@hdfs")) + throw new IllegalArgumentException("meta url should like @hdfs schema"); + + logger.info("Ready to load KylinConfig from uri: {}", uri); + KylinConfig config; + FileSystem fs; + int cut = uri.indexOf('@'); + String realHdfsPath = uri.substring(0, cut) + "/" + KylinConfig.KYLIN_CONF_PROPERTIES_FILE; + try { + fs = HadoopUtil.getFileSystem(realHdfsPath); + InputStream is = fs.open(new Path(realHdfsPath)); + Properties prop = KylinConfig.streamToProps(is); + config = KylinConfig.createKylinConfig(prop); + } catch (IOException e) { + throw new RuntimeException(e); + } + KylinConfig.setKylinConfigThreadLocal(config); + return config; + } + protected void attachTableMetadata(TableDesc table, Configuration conf) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); dumpList.add(table.getResourcePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 84ca006..bbf38e5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -65,6 +65,7 @@ public interface BatchConstants { String CFG_OUTPUT_STATISTICS = "statistics"; String CFG_OUTPUT_PARTITION = "partition"; String CFG_MR_SPARK_JOB = "mr.spark.job"; + String CFG_SPARK_META_URL = "spark.meta.url"; /** * command line ARGuments http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 779f340..2773f97 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -47,9 +47,10 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), seg.getUuid())); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfDir().getAbsolutePath()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), + seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), + getSegmentMetadataUrl(seg.getConfig(), seg.getUuid())); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); StringBuilder jars = new StringBuilder(); @@ -57,9 +58,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar StringUtil.appendWithSeparator(jars, findJar("org.apache.htrace.Trace", null)); // htrace-core.jar StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration", null)); // htrace-core.jar - StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection", null)); // hbase-client.jar - StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration", null)); // hbase-common.jar - StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer", null)); // hbase-protocol.jar StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava")); //guava.jar http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index dab5fb7..94435f5 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,7 +17,6 @@ */ package org.apache.kylin.engine.spark; -import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; @@ -54,6 +53,7 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; @@ -63,7 +63,6 @@ import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.spark.SparkConf; -import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -74,7 +73,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.util.SizeEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,8 +95,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true) .withDescription("Hive Intermediate Table").create("hiveTable"); - public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true) - .withDescription("Configuration Path").create("confPath"); private Options options; @@ -109,7 +105,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_META_URL); options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_CONF_PATH); } @Override @@ -117,22 +112,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return options; } - public static KylinConfig getKylinConfigForExecutor(String metaUrl) { - File file = new File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE)); - String confPath = file.getParentFile().getAbsolutePath(); - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - final KylinConfig config = KylinConfig.getInstanceFromEnv(); - config.setMetadataUrl(metaUrl); - return config; - } - @Override protected void execute(OptionsHelper optionsHelper) throws Exception { String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); - String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class, @@ -147,14 +132,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa JavaSparkContext sc = new JavaSparkContext(conf); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - sc.addFile(confPath + File.separator + KylinConfig.KYLIN_CONF_PROPERTIES_FILE); - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); + KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); + confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 + final Job job = Job.getInstance(confOverwrite); + + logger.info("RDD Output path: {}", outputPath); + setHadoopConf(job); + int countMeasureIndex = 0; for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { if (measureDesc.getFunction().isCount() == true) { @@ -194,26 +184,22 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final int totalLevels = cubeDesc.getBuildLevel(); JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1]; int level = 0; - long baseRDDSize = SizeEstimator.estimate(encodedBaseRDD) / (1024 * 1024); - int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int) baseRDDSize); + int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig); // aggregate to calculate base cuboid allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); - Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); - confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 - saveToHDFS(allRDDs[0], cubeName, metaUrl, cubeSegment, outputPath, 0, confOverwrite); + saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job); // aggregate to ND cuboids for (level = 1; level <= totalLevels; level++) { - long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / (1024 * 1024); - partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int) levelRddSize); + partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig); allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl)) .reduceByKey(reducerFunction2, partition).persist(storageLevel); if (envConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } - saveToHDFS(allRDDs[level], cubeName, metaUrl, cubeSegment, outputPath, level, confOverwrite); + saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job); allRDDs[level - 1].unpersist(); } allRDDs[totalLevels].unpersist(); @@ -221,9 +207,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa deleteHDFSMeta(metaUrl); } - private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig, - int rddSize) { - int baseCuboidSize = (int) Math.min(rddSize, statsReader.estimateLayerSize(level)); + protected void setHadoopConf(Job job) throws Exception { + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + } + + protected int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) { + double baseCuboidSize = statsReader.estimateLayerSize(level); float rddCut = kylinConfig.getSparkRDDPartitionCutMB(); int partition = (int) (baseCuboidSize / rddCut); partition = Math.max(kylinConfig.getSparkMinPartition(), partition); @@ -231,20 +221,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return partition; } - private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String cubeName, final String metaUrl, - final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Configuration conf) throws Exception { + protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String metaUrl, final String cubeName, + final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Job job) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); - Job job = Job.getInstance(conf); IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat(); outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeSeg.getCubeInstance().getName()); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, cubeSeg.getUuid()); - job.getConfiguration().set(BatchConstants.CFG_MR_SPARK_JOB, "spark"); - rdd.mapToPair( new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { private volatile transient boolean initialized = false; @@ -253,10 +236,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call( Tuple2<ByteArray, Object[]> tuple2) throws Exception { - if (!initialized) { + + if (initialized == false) { synchronized (SparkCubingByLayer.class) { - if (!initialized) { - KylinConfig kylinConfig = getKylinConfigForExecutor(metaUrl); + if (initialized == false) { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); codec = new BufferedMeasureCodec(desc.getMeasures()); initialized = true; @@ -269,21 +253,22 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); } - }).sortByKey().saveAsNewAPIHadoopDataset(job.getConfiguration()); + + }).saveAsNewAPIHadoopDataset(job.getConfiguration()); logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); } - static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> { + static public class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> { private volatile transient boolean initialized = false; private BaseCuboidBuilder baseCuboidBuilder = null; private String cubeName; private String segmentId; - private String metaurl; + private String metaUrl; public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) { this.cubeName = cubeName; this.segmentId = segmentId; - this.metaurl = metaurl; + this.metaUrl = metaurl; } @Override @@ -291,7 +276,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { - KylinConfig kConfig = getKylinConfigForExecutor(metaurl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -327,7 +312,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } } - static class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { + static public class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { protected String cubeName; protected String metaUrl; protected CubeDesc cubeDesc; @@ -341,7 +326,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } public void init() { - KylinConfig kConfig = getKylinConfigForExecutor(metaUrl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); cubeDesc = cubeInstance.getDescriptor(); aggregators = new MeasureAggregators(cubeDesc.getMeasures()); @@ -364,7 +349,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } } - static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { + static public class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { private boolean[] needAggr; public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr) { @@ -390,7 +375,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0); - static class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + static public class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { private String cubeName; private String segmentId; @@ -409,11 +394,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } public void init() { - KylinConfig kConfig = getKylinConfigForExecutor(metaUrl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); this.cubeSegment = cubeInstance.getSegmentById(segmentId); this.cubeDesc = cubeInstance.getDescriptor(); this.cuboidScheduler = cubeDesc.getCuboidScheduler(); + this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment)); this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @@ -456,8 +442,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } } - //sanity check - private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, + protected void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) { int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size(); Long count2 = getRDDCountSum(rdd, countMeasureIndex); @@ -487,7 +472,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return count; } - private void deleteHDFSMeta(String metaUrl) throws IOException { + protected void deleteHDFSMeta(String metaUrl) throws IOException { int cut = metaUrl.indexOf('@'); String path = metaUrl.substring(0, cut); HadoopUtil.getFileSystem(path).delete(new Path(path), true); http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index d369e3d..7f4b377 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -34,11 +34,13 @@ import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecuteResult; import org.slf4j.LoggerFactory; @@ -50,11 +52,16 @@ public class SparkExecutable extends AbstractExecutable { private static final String CLASS_NAME = "className"; private static final String JARS = "jars"; + private static final String JOB_ID = "jobId"; public void setClassName(String className) { this.setParam(CLASS_NAME, className); } + public void setJobId(String jobId) { + this.setParam(JOB_ID, jobId); + } + public void setJars(String jars) { this.setParam(JARS, jars); } @@ -66,7 +73,7 @@ public class SparkExecutable extends AbstractExecutable { tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" "); if (entry.getKey().equals(CLASS_NAME)) { stringBuilder.insert(0, tmp); - } else if (entry.getKey().equals(JARS)) { + } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)) { // JARS is for spark-submit, not for app continue; } else { @@ -86,6 +93,8 @@ public class SparkExecutable extends AbstractExecutable { CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName); final KylinConfig config = cube.getConfig(); + setAlgorithmLayer(); + if (KylinConfig.getSparkHome() == null) { throw new NullPointerException(); } @@ -99,7 +108,8 @@ public class SparkExecutable extends AbstractExecutable { hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); if (StringUtils.isEmpty(hadoopConf)) { - throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); + throw new RuntimeException( + "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); } File hiveConfFile = new File(hadoopConf, "hive-site.xml"); @@ -124,7 +134,8 @@ public class SparkExecutable extends AbstractExecutable { } StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); + stringBuilder.append( + "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); Map<String, String> sparkConfs = config.getSparkConfigOverride(); for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { @@ -133,7 +144,8 @@ public class SparkExecutable extends AbstractExecutable { stringBuilder.append("--jars %s %s %s"); try { - String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); + String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, + formatArgs()); logger.info("cmd: " + cmd); CliCommandExecutor exec = new CliCommandExecutor(); PatternedLogger patternedLogger = new PatternedLogger(logger); @@ -146,6 +158,13 @@ public class SparkExecutable extends AbstractExecutable { } } + // Spark Cubing can only work in layer algorithm + private void setAlgorithmLayer() { + ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID)); + cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER); + } + private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance())); @@ -154,7 +173,8 @@ public class SparkExecutable extends AbstractExecutable { dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig()); } - private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) throws IOException { + private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) + throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); FileUtils.forceDelete(tmp); // we need a directory, so delete the file first