shaofengshi closed pull request #210: KYLIN-3477 Save spark job counter to hdfs URL: https://github.com/apache/kylin/pull/210
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index 3cb43fc4c9..3aef34ad1e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -30,12 +31,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + public class HadoopUtil { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class); @@ -193,4 +199,34 @@ public static void deleteHDFSMeta(String metaUrl) throws IOException { HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true); logger.info("Delete metadata in HDFS for this job: " + realHdfsPath); } + + @SuppressWarnings("deprecation") + public static void writeToSequenceFile(Configuration conf, String outputPath, Map<String, String> counterMap) throws IOException { + try (SequenceFile.Writer writer = SequenceFile.createWriter(getWorkingFileSystem(conf), conf, new Path(outputPath), Text.class, Text.class)) { + for (Map.Entry<String, String> counterEntry : counterMap.entrySet()) { + writer.append(new Text(counterEntry.getKey()), new Text(counterEntry.getValue())); + } + } + } + + @SuppressWarnings("deprecation") + public static Map<String, String> readFromSequenceFile(Configuration conf, String inputPath) throws IOException { + try (SequenceFile.Reader reader = new SequenceFile.Reader(getWorkingFileSystem(conf), new Path(inputPath), conf)) { + Map<String, String> map = Maps.newHashMap(); + + Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); + + while (reader.next(key, value)) { + map.put(key.toString(), value.toString()); + } + + return map; + } + } + + public static Map<String, String> readFromSequenceFile(String inputPath) throws IOException { + return readFromSequenceFile(getCurrentConfiguration(), inputPath); + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 02e9fe550f..5b1f38c51c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -334,7 +334,11 @@ public String getOptimizationCuboidPath(String jobId) { } public String getHBaseConfFilePath(String jobId) { - return getJobWorkingDir(jobId) + "/hbase-conf.xml"; + return getJobWorkingDir(jobId) + "/hbase-conf.xml"; + } + + public String getCounterOuputPath(String jobId) { + return getRealizationRootPath(jobId) + "/counter"; } // ============================================================================ diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 8c2ba7f33d..66da1b266a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -106,6 +106,8 @@ String ARG_META_URL = "metadataUrl"; String ARG_HBASE_CONF_PATH = "hbaseConfPath"; String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath"; + String ARG_COUNTER_OUPUT = "counterOutput"; + /** * logger and counter */ diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 5fd7213b2b..3f3c14da2a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -103,7 +103,7 @@ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) { sparkExecutable.setJobId(jobId); sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); - sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); + sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOuputPath(jobId)); StringBuilder jars = new StringBuilder(); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 612239741f..d85337e624 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -35,11 +35,13 @@ import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.constant.ExecutableConstants; @@ -80,6 +82,11 @@ public void setCounterSaveAs(String value) { this.setParam(COUNTER_SAVE_AS, value); } + public void setCounterSaveAs(String value, String counterOutputPath) { + this.setParam(COUNTER_SAVE_AS, value); + this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath); + } + public String getCounterSaveAs() { return getParam(COUNTER_SAVE_AS); } @@ -286,6 +293,14 @@ public void onLogEvent(String infoKey, Map<String, String> info) { } // done, update all properties Map<String, String> joblogInfo = patternedLogger.getInfo(); + + // read counter from hdfs + String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT); + if (counterOutput != null){ + Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); + joblogInfo.putAll(counterMap); + } + readCounters(joblogInfo); getManager().addJobInfo(getId(), joblogInfo); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java index 61e2e534e8..6188a56e1f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java @@ -73,6 +73,7 @@ import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping; import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.measure.hllc.RegisterType; @@ -119,6 +120,8 @@ .withDescription("Hive Intermediate Table").create("hiveTable"); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); + public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg() + .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT); private Options options; @@ -131,6 +134,7 @@ public SparkFactDistinct() { options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_STATS_SAMPLING_PERCENT); + options.addOption(OPTION_COUNTER_PATH); } @Override @@ -146,6 +150,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception { String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH); int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT)); Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") }; @@ -173,6 +178,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception { logger.info("RDD Output path: {}", outputPath); logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum()); logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum()); + logger.info("counter path {}", counterPath); boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat()); @@ -202,13 +208,20 @@ protected void execute(OptionsHelper optionsHelper) throws Exception { multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration()); - // only work for client mode, not work when spark.submit.deployMode=cluster logger.info("Map input records={}", recordRDD.count()); logger.info("HDFS Read: {} HDFS Write", bytesWritten.value()); + Map<String, String> counterMap = Maps.newHashMap(); + counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordRDD.count())); + counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value())); + + // save counter to hdfs + HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap); + HadoopUtil.deleteHDFSMeta(metaUrl); } + static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> { private volatile transient boolean initialized = false; private String cubeName; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java index be230f011e..ccab22f878 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java @@ -71,7 +71,7 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) { sparkExecutable.setJars(jars.toString()); sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); - sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES); + sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId)); return sparkExecutable; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java index c87a739b9a..539f03b23e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java @@ -23,8 +23,10 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; +import com.google.common.collect.Maps; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -55,6 +57,7 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration; import org.apache.kylin.engine.spark.KylinSparkJobListener; import org.apache.kylin.engine.spark.SparkUtil; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.measure.MeasureCodec; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; @@ -88,6 +91,8 @@ .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT); public static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION) .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); + public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg() + .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT); private Options options; @@ -100,6 +105,7 @@ public SparkCubeHFile() { options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH); + options.addOption(OPTION_COUNTER_PATH); } @Override @@ -116,6 +122,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception { final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH)); final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH); + final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH); Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class, KeyValue.class, RowKeyWritable.class }; @@ -226,9 +233,14 @@ protected void execute(OptionsHelper optionsHelper) throws Exception { } }).saveAsNewAPIHadoopDataset(job.getConfiguration()); - // output the data size to console, job engine will parse and save the metric - // please note: this mechanism won't work when spark.submit.deployMode=cluster logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten()); + + Map<String, String> counterMap = Maps.newHashMap(); + counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten())); + + // save counter to hdfs + HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap); + //HadoopUtil.deleteHDFSMeta(metaUrl); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services