Repository: kylin Updated Branches: refs/heads/master 4662adab7 -> 5e13bba08
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 0a6c123..0b45795 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -22,12 +22,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -38,7 +34,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; @@ -52,16 +47,12 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class MergeStatisticsStep extends AbstractExecutable { - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String MERGING_SEGMENT_IS = "mergingSegmentIds"; - private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath"; protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap(); public MergeStatisticsStep() { @@ -73,16 +64,16 @@ public class MergeStatisticsStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig kylinConf = context.getConfig(); final CubeManager mgr = CubeManager.getInstance(kylinConf); - final CubeInstance cube = mgr.getCube(getCubeName()); - final CubeSegment newSegment = cube.getSegmentById(getSegmentId()); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); Configuration conf = HadoopUtil.getCurrentConfiguration(); ResourceStore rs = ResourceStore.getStore(kylinConf); try { int averageSamplingPercentage = 0; - for (String segmentId : this.getMergingSegmentIds()) { - String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId); + for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) { + String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId); InputStream is = rs.getResource(fileKey).inputStream; File tempFile = null; FileOutputStream tempFileStream = null; @@ -126,9 +117,9 @@ public class MergeStatisticsStep extends AbstractExecutable { tempFile.delete(); } } - averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size(); - CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage); - Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); + averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); + CuboidStatsUtil.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); + Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = statisticsFilePath.getFileSystem(conf); FSDataInputStream is = fs.open(statisticsFilePath); try { @@ -146,45 +137,4 @@ public class MergeStatisticsStep extends AbstractExecutable { } } - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setMergingSegmentIds(List<String> ids) { - setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ",")); - } - - private List<String> getMergingSegmentIds() { - final String ids = getParam(MERGING_SEGMENT_IS); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - - public void setMergedStatisticsPath(String path) { - setParam(MERGED_STATISTICS_PATH, path); - } - - private String getMergedStatisticsPath() { - return getParam(MERGED_STATISTICS_PATH); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 1dbce8e..ff9be44 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -131,7 +131,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { if (myChildren == null || myChildren.size() == 0) { context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L); skipCounter++; - if (skipCounter % BatchConstants.COUNTER_MAX == 0) { + if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Skipped " + skipCounter + " records!"); } return; @@ -140,7 +140,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L); handleCounter++; - if (handleCounter % BatchConstants.COUNTER_MAX == 0) { + if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + handleCounter + " records!"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 9314b88..288ca6a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -47,11 +47,6 @@ import org.apache.kylin.metadata.model.MeasureDesc; */ public class SaveStatisticsStep extends AbstractExecutable { - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String STATISTICS_PATH = "statisticsPath"; - private static final String CUBING_JOB_ID = "cubingJobId"; - public SaveStatisticsStep() { super(); } @@ -60,15 +55,15 @@ public class SaveStatisticsStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig kylinConf = context.getConfig(); final CubeManager mgr = CubeManager.getInstance(kylinConf); - final CubeInstance cube = mgr.getCube(getCubeName()); - final CubeSegment newSegment = cube.getSegmentById(getSegmentId()); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); ResourceStore rs = ResourceStore.getStore(kylinConf); try { - Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); + Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration()); if (!fs.exists(statisticsFilePath)) - throw new IOException("File " + statisticsFilePath + " does not exists;"); + throw new IOException("File " + statisticsFilePath + " does not exists"); FSDataInputStream is = fs.open(statisticsFilePath); try { @@ -105,7 +100,7 @@ public class SaveStatisticsStep extends AbstractExecutable { break; } } - + if (memoryHungry == true) { alg = AlgorithmEnum.LAYER; } else if ("random".equalsIgnoreCase(algPref)) { // for testing @@ -120,40 +115,8 @@ public class SaveStatisticsStep extends AbstractExecutable { } logger.info("The cube algorithm for " + seg + " is " + alg); - CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId()); + CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); cubingJob.setAlgorithm(alg); } - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setStatisticsPath(String path) { - this.setParam(STATISTICS_PATH, path); - } - - private String getStatisticsPath() { - return getParam(STATISTICS_PATH); - } - - public void setCubingJobId(String id) { - setParam(CUBING_JOB_ID, id); - } - - private String getCubingJobId() { - return getParam(CUBING_JOB_ID); - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index a10fef4..c41aaf1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -33,50 +33,22 @@ import org.apache.kylin.job.execution.ExecuteResult; */ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { - private static final String SEGMENT_ID = "segmentId"; - private static final String CUBE_NAME = "cubeName"; - private static final String CUBING_JOB_ID = "cubingJobId"; - public UpdateCubeInfoAfterBuildStep() { super(); } - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setCubingJobId(String id) { - setParam(CUBING_JOB_ID, id); - } - - private String getCubingJobId() { - return getParam(CUBING_JOB_ID); - } - @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(getCubeName()); - final CubeSegment segment = cube.getSegmentById(getSegmentId()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId()); + CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long sourceCount = cubingJob.findSourceRecordCount(); long sourceSizeBytes = cubingJob.findSourceSizeBytes(); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); - segment.setLastBuildJobID(getCubingJobId()); + segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); segment.setLastBuildTime(System.currentTimeMillis()); segment.setSizeKB(cubeSizeBytes / 1024); segment.setInputRecords(sourceCount); @@ -90,5 +62,5 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 527572b..d3ed68a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -40,10 +40,6 @@ import com.google.common.collect.Lists; */ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; - private static final String CUBING_JOB_ID = "cubingJobId"; private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -53,18 +49,18 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeInstance cube = cubeManager.getCube(getCubeName()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - CubeSegment mergedSegment = cube.getSegmentById(getSegmentId()); + CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); if (mergedSegment == null) { - return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId()); + return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())); } - CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId()); + CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); // collect source statistics - List<String> mergingSegmentIds = getMergingSegmentIds(); + List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams()); if (mergingSegmentIds.isEmpty()) { return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments"); } @@ -80,7 +76,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { mergedSegment.setSizeKB(cubeSizeBytes / 1024); mergedSegment.setInputRecords(sourceCount); mergedSegment.setInputRecordsSize(sourceSize); - mergedSegment.setLastBuildJobID(getCubingJobId()); + mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); mergedSegment.setLastBuildTime(System.currentTimeMillis()); try { @@ -92,45 +88,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { } } - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setMergingSegmentIds(List<String> ids) { - setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); - } - - private List<String> getMergingSegmentIds() { - final String ids = getParam(MERGING_SEGMENT_IDS); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - - public void setCubingJobId(String id) { - setParam(CUBING_JOB_ID, id); - } - - private String getCubingJobId() { - return getParam(CUBING_JOB_ID); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 285729f..981dac3 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ClassUtil; @@ -103,7 +104,7 @@ import java.util.concurrent.LinkedBlockingQueue; public class SparkCubing extends AbstractApplication { private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); - private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("cube").hasArg().isRequired(true).withDescription("Cube Name").create("cubeName"); + private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java index 9f17d60..05246f4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java @@ -18,12 +18,12 @@ package org.apache.kylin.engine.spark; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.slf4j.Logger; @@ -48,7 +48,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport { } public DefaultChainedExecutable build() { - final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config); + final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config); final String jobId = result.getId(); inputSide.addStepPhase1_CreateFlatTable(result); @@ -59,7 +59,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport { final SparkExecutable sparkExecutable = new SparkExecutable(); sparkExecutable.setClassName(SparkCubing.class.getName()); sparkExecutable.setParam("hiveTable", tableName); - sparkExecutable.setParam("cubeName", seg.getRealization().getName()); + sparkExecutable.setParam(CubingExecutableUtil.CUBE_NAME, seg.getRealization().getName()); sparkExecutable.setParam("segmentId", seg.getUuid()); sparkExecutable.setParam("confPath", confPath); sparkExecutable.setParam("coprocessor", coprocessor); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java index 1c3e71a..5fea710 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.invertedindex.IIDescManager; import org.apache.kylin.invertedindex.IIManager; @@ -100,40 +101,7 @@ public abstract class BasicService { } protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) { - List<CubingJob> results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables()).filter(new Predicate<AbstractExecutable>() { - @Override - public boolean apply(AbstractExecutable executable) { - if (executable instanceof CubingJob) { - if (cubeName == null) { - return true; - } - return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName); - } else { - return false; - } - } - }).transform(new Function<AbstractExecutable, CubingJob>() { - @Override - public CubingJob apply(AbstractExecutable executable) { - return (CubingJob) executable; - } - }).filter(new Predicate<CubingJob>() { - @Override - public boolean apply(CubingJob executable) { - if (null == projectName || null == getProjectManager().getProject(projectName)) { - return true; - } else { - ProjectInstance project = getProjectManager().getProject(projectName); - return project.containsRealization(RealizationType.CUBE, executable.getCubeName()); - } - } - }).filter(new Predicate<CubingJob>() { - @Override - public boolean apply(CubingJob executable) { - return statusList.contains(allOutputs.get(executable.getId()).getState()); - } - })); - return results; + return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs); } protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs) { @@ -144,7 +112,7 @@ public abstract class BasicService { if (cubeName == null) { return true; } - return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName); + return CubingExecutableUtil.getCubeName(executable.getParams()).equalsIgnoreCase(cubeName); } else { return false; } @@ -161,7 +129,7 @@ public abstract class BasicService { return true; } else { ProjectInstance project = getProjectManager().getProject(projectName); - return project.containsRealization(RealizationType.CUBE, executable.getCubeName()); + return project.containsRealization(RealizationType.CUBE, CubingExecutableUtil.getCubeName(executable.getParams())); } } }).filter(new Predicate<CubingJob>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java index 155593a..cd8eef9 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -19,7 +19,13 @@ package org.apache.kylin.rest.service; import java.io.IOException; -import java.util.*; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.cube.CubeInstance; @@ -30,6 +36,7 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; @@ -84,7 +91,7 @@ public class JobService extends BasicService { } public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { - Calendar calendar= Calendar.getInstance(); + Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); long currentTimeMillis = calendar.getTimeInMillis(); long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); @@ -153,22 +160,22 @@ public class JobService extends BasicService { private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) { switch (timeFilter) { - case LAST_ONE_DAY: - calendar.add(Calendar.DAY_OF_MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_WEEK: - calendar.add(Calendar.WEEK_OF_MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_MONTH: - calendar.add(Calendar.MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_YEAR: - calendar.add(Calendar.YEAR, -1); - return calendar.getTimeInMillis(); - case ALL: - return 0; - default: - throw new RuntimeException("illegal timeFilter for job history:" + timeFilter); + case LAST_ONE_DAY: + calendar.add(Calendar.DAY_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_WEEK: + calendar.add(Calendar.WEEK_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_MONTH: + calendar.add(Calendar.MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_YEAR: + calendar.add(Calendar.YEAR, -1); + return calendar.getTimeInMillis(); + case ALL: + return 0; + default: + throw new RuntimeException("illegal timeFilter for job history:" + timeFilter); } } @@ -263,8 +270,8 @@ public class JobService extends BasicService { CubingJob cubeJob = (CubingJob) job; final JobInstance result = new JobInstance(); result.setName(job.getName()); - result.setRelatedCube(cubeJob.getCubeName()); - result.setRelatedSegment(cubeJob.getSegmentIds()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); result.setLastModified(cubeJob.getLastModified()); result.setSubmitter(cubeJob.getSubmitter()); result.setUuid(cubeJob.getId()); @@ -288,8 +295,8 @@ public class JobService extends BasicService { Output output = outputs.get(job.getId()); final JobInstance result = new JobInstance(); result.setName(job.getName()); - result.setRelatedCube(cubeJob.getCubeName()); - result.setRelatedSegment(cubeJob.getSegmentIds()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); result.setLastModified(output.getLastModified()); result.setSubmitter(cubeJob.getSubmitter()); result.setUuid(cubeJob.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index cbfd8c3..68b6ae4 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -60,7 +60,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab bindCurrentConfiguration(conf); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - String tableName = conf.get(BatchConstants.TABLE_NAME); + String tableName = conf.get(BatchConstants.CFG_TABLE_NAME); tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName); tableInputFormat = MRUtil.getTableInputFormat(tableDesc); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index 70286ab..9162208 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; /** @@ -79,7 +78,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { setJobClasspath(job); String table = getOptionValue(OPTION_TABLE); - job.getConfiguration().set(BatchConstants.TABLE_NAME, table); + job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); FileOutputFormat.setOutputPath(job, output); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index c2e2e64..6d77240 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -154,30 +154,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - tuneScanParameters(scan); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; + return getAllResources(rangeStart, rangeEnd, -1L, -1L); } @Override @@ -188,7 +165,10 @@ public class HBaseResourceStore extends ResourceStore { Scan scan = new Scan(startRow, endRow); scan.addColumn(B_FAMILY, B_COLUMN_TS); scan.addColumn(B_FAMILY, B_COLUMN); - scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); + FilterList filterList = generateTimeFilterList(timeStartInMillis, timeEndInMillis); + if (filterList != null) { + scan.setFilter(filterList); + } tuneScanParameters(scan); HTableInterface table = getConnection().getTable(getAllInOneTableName()); @@ -218,11 +198,15 @@ public class HBaseResourceStore extends ResourceStore { private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); - SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); - filterList.addFilter(timeStartFilter); - SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); - filterList.addFilter(timeEndFilter); - return filterList; + if (timeStartInMillis != -1L) { + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); + filterList.addFilter(timeStartFilter); + } + if (timeEndInMillis != -1L) { + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); + filterList.addFilter(timeEndFilter); + } + return filterList.getFilters().size() == 0 ? null : filterList; } private InputStream getInputStream(String resPath, Result r) throws IOException { @@ -325,7 +309,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] rowkey = Bytes.toBytes(path); Get get = new Get(rowkey); - + if (!fetchContent && !fetchTimestamp) { get.setCheckExistenceOnly(true); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index a828728..f71d0f8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -7,15 +7,16 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob; -import org.apache.kylin.storage.hbase.ii.IICreateHFileJob; -import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.realization.IRealizationSegment; import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob; +import org.apache.kylin.storage.hbase.ii.IICreateHFileJob; +import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -41,16 +42,16 @@ public class HBaseMRSteps extends JobBuilderSupport { public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) { String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*"; - + MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId)); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getRowkeyDistributionOutputPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step"); rowkeyDistributionStep.setMapReduceParams(cmd.toString()); rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); @@ -69,10 +70,10 @@ public class HBaseMRSteps extends JobBuilderSupport { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); - appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); createHtableStep.setJobParams(cmd.toString()); createHtableStep.setJobClass(CreateHTableJob.class); @@ -82,18 +83,18 @@ public class HBaseMRSteps extends JobBuilderSupport { public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) { String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*"; - + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); createHFilesStep.setMapReduceParams(cmd.toString()); createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); @@ -107,9 +108,9 @@ public class HBaseMRSteps extends JobBuilderSupport { bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "input", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); bulkLoadStep.setJobParams(cmd.toString()); bulkLoadStep.setJobClass(BulkLoadJob.class); @@ -125,7 +126,7 @@ public class HBaseMRSteps extends JobBuilderSupport { } public List<String> getMergingHTables() { - final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); final List<String> mergingHTables = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { @@ -135,7 +136,7 @@ public class HBaseMRSteps extends JobBuilderSupport { } public List<String> getMergingHDFSPaths() { - final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); final List<String> mergingHDFSPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { @@ -187,15 +188,14 @@ public class HBaseMRSteps extends JobBuilderSupport { jobFlow.addTask(createCreateIIHTableStep(seg)); final String iiPath = rootPath + "*"; - + // generate hfiles step jobFlow.addTask(createConvertIIToHfileStep(seg, iiPath, jobFlow.getId())); // bulk load step jobFlow.addTask(createIIBulkLoadStep(seg, jobFlow.getId())); - - } + } public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { String jobId = jobFlow.getId(); @@ -211,14 +211,12 @@ public class HBaseMRSteps extends JobBuilderSupport { jobFlow.addTask(step); } - - private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); createHtableStep.setJobClass(IICreateHTableJob.class); @@ -232,11 +230,11 @@ public class HBaseMRSteps extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); createHFilesStep.setMapReduceParams(cmd.toString()); createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class); @@ -249,9 +247,9 @@ public class HBaseMRSteps extends JobBuilderSupport { bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "input", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName()); bulkLoadStep.setJobParams(cmd.toString()); bulkLoadStep.setJobClass(IIBulkLoadJob.class); @@ -259,5 +257,5 @@ public class HBaseMRSteps extends JobBuilderSupport { return bulkLoadStep; } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 4cc4794..fa62a62 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -76,7 +76,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100); FSDataInputStream inputStream = null; try { - inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)); + inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME)); ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis()); } finally { IOUtils.closeQuietly(inputStream); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java index 2ff7356..8b5daa3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java @@ -99,11 +99,11 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); - job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB)); - job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + job.getConfiguration().set(BatchConstants.CFG_HFILE_SIZE_GB, String.valueOf(hfileSizeGB)); + job.getConfiguration().set(BatchConstants.CFG_REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); + job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); + job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MIN, String.valueOf(minRegionCount)); // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java index acdab62..4e53ca4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java @@ -57,24 +57,24 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) { - output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH); + if (context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH) != null) { + output = context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH); } - if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) { - hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB)); + if (context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB) != null) { + hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB)); } - if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) { - cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE)); + if (context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE) != null) { + cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE)); } - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) { - minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN)); + if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN) != null) { + minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN)); } - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) { - maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX)); + if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX) != null) { + maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX)); } logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount