http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java index c6b8f56..ce19500 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java @@ -52,8 +52,7 @@ public class HadoopShellExecutable extends AbstractExecutable { Preconditions.checkNotNull(mapReduceJobClass); Preconditions.checkNotNull(params); try { - final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil - .forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); + final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); final AbstractHadoopJob job = constructor.newInstance(); String[] args = params.trim().split("\\s+"); logger.info("parameters of the HadoopShellExecutable: {}", params); @@ -69,8 +68,7 @@ public class HadoopShellExecutable extends AbstractExecutable { result = 2; } log.append("result code:").append(result); - return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) - : new ExecuteResult(ExecuteResult.State.FAILED, log.toString()); + return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString()); } catch (ReflectiveOperationException e) { logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java index 8e2d634..d32928f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java @@ -55,11 +55,9 @@ public class HadoopStatusChecker { } JobStepStatusEnum status = null; try { - final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID) - .get(useKerberosAuth); + final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); - output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " - + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); + output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); switch (result.getRight()) { case SUCCEEDED: http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index f20e0a1..189e019 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -80,9 +80,7 @@ public class JobInfoConverter { } if (task instanceof MapReduceExecutable) { result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); - result.setExecWaitTime( - AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) - / 1000); + result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); } if (task instanceof HadoopShellExecutable) { result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 02c8f45..07efb34 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -108,8 +108,7 @@ public class MapReduceExecutable extends AbstractExecutable { job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID) + " resumed"); } else { - final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil - .forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); + final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); final AbstractHadoopJob hadoopJob = constructor.newInstance(); hadoopJob.setConf(HadoopUtil.getCurrentConfiguration()); hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away @@ -155,8 +154,7 @@ public class MapReduceExecutable extends AbstractExecutable { mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin"); return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin"); } - if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED - || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { + if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { final long waitTime = System.currentTimeMillis() - getStartTime(); setMapReduceWaitTime(waitTime); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java index dfb0b8b..9ab42ea 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java @@ -18,8 +18,6 @@ package org.apache.kylin.engine.mr.common; -import java.io.Serializable; - import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SplittedBytes; @@ -32,6 +30,8 @@ import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; + /** */ public class NDCuboidBuilder implements Serializable { @@ -57,6 +57,7 @@ public class NDCuboidBuilder implements Serializable { this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } + public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); @@ -66,7 +67,7 @@ public class NDCuboidBuilder implements Serializable { long mask = Long.highestOneBit(parentCuboid.getId()); long parentCuboidId = parentCuboid.getId(); long childCuboidId = childCuboid.getId(); - long parentCuboidIdActualLength = (long) Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); + long parentCuboidIdActualLength = (long)Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId for (int i = 0; i < parentCuboidIdActualLength; i++) { if ((mask & parentCuboidId) > 0) {// if the this bit position equals http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index e6f976a..93e413b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -18,10 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -38,6 +34,10 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + /** */ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { @@ -66,12 +66,12 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); cubeDesc = cube.getDescriptor(); cubeSegment = cube.getSegmentById(segmentID); - CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich( - EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc); } + protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException { byte[] rowKey = baseCuboidBuilder.buildKey(flatRow); outputKey.set(rowKey, 0, rowKey.length); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 6b5c8d1..98ebbb4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -75,15 +75,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob { Path colDir = new Path(factColumnsInputPath, col.getIdentity()); FileSystem fs = HadoopUtil.getWorkingFileSystem(); - Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, - col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); if (dictFile == null) { logger.info("Dict for '" + col.getName() + "' not pre-built."); return null; } - try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), - SequenceFile.Reader.file(dictFile))) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) { NullWritable key = NullWritable.get(); BytesWritable value = new BytesWritable(); reader.next(key, value); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java index acc224e..65c5869 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@ -23,17 +23,17 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; + +import com.google.common.collect.Lists; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.execution.ExecutableContext; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import javax.annotation.Nullable; public class CubingExecutableUtil { @@ -78,14 +78,13 @@ public class CubingExecutableUtil { final CubeInstance cube = mgr.getCube(cubeName); if (cube == null) { - String cubeList = StringUtils - .join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() { - @Nullable - @Override - public String apply(@Nullable CubeInstance input) { - return input.getName(); - } - }).iterator(), ","); + String cubeList = StringUtils.join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() { + @Nullable + @Override + public String apply(@Nullable CubeInstance input) { + return input.getName(); + } + }).iterator(), ","); throw new IllegalStateException("target cube name: " + cubeName + " cube list: " + cubeList); } @@ -93,14 +92,13 @@ public class CubingExecutableUtil { final CubeSegment newSegment = cube.getSegmentById(segmentId); if (newSegment == null) { - String segmentList = StringUtils - .join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() { - @Nullable - @Override - public String apply(@Nullable CubeSegment input) { - return input.getUuid(); - } - }).iterator(), ","); + String segmentList = StringUtils.join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() { + @Nullable + @Override + public String apply(@Nullable CubeSegment input) { + return input.getUuid(); + } + }).iterator(), ","); throw new IllegalStateException("target segment id: " + segmentId + " segment list: " + segmentList); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index fcea420..6a8ba4c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -93,8 +93,7 @@ public class CuboidJob extends AbstractHadoopJob { CubeSegment segment = cube.getSegmentById(segmentID); if (checkSkip(cubingJobId)) { - logger.info( - "Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]"); + logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]"); return 0; } @@ -142,8 +141,7 @@ public class CuboidJob extends AbstractHadoopJob { if ("FLAT_TABLE".equals(input)) { // base cuboid case - IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg) - .getFlatTableInputFormat(); + IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); } else { // n-dimension cuboid case http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index f7e8e4b..495be77 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import com.google.common.collect.Lists; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; @@ -35,8 +36,6 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - /** * @author George Song (ysong1) * http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java index 4a5cf07..a367bc6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java @@ -26,8 +26,7 @@ import org.apache.kylin.engine.mr.KylinReducer; /** * @author yangli9 */ -public class FactDistinctColumnsCombiner - extends KylinReducer<SelfDefineSortableKey, Text, SelfDefineSortableKey, Text> { +public class FactDistinctColumnsCombiner extends KylinReducer<SelfDefineSortableKey, Text, SelfDefineSortableKey, Text> { @Override protected void setup(Context context) throws IOException { @@ -35,8 +34,7 @@ public class FactDistinctColumnsCombiner } @Override - public void doReduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) - throws IOException, InterruptedException { + public void doReduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // for hll, each key only has one output, no need to do local combine; // for normal col, values are empty text http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index fa6c62f..ee0989a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -94,9 +94,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } if (reducerCount > 255) { - throw new IllegalArgumentException( - "The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount - + ", decrease 'kylin.engine.mr.uhc-reducer-count'"); + throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'"); } job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); @@ -154,14 +152,11 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setNumReduceTasks(numberOfReducers); //make each reducer output to respective dir - MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, - NullWritable.class, Text.class); - MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, - NullWritable.class, BytesWritable.class); - MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, - LongWritable.class, BytesWritable.class); - MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, - NullWritable.class, LongWritable.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class); + FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 480ef95..713b7f7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -43,6 +43,8 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; + + /** */ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { @@ -53,6 +55,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB BYTES } + protected boolean collectStatistics = false; protected CuboidScheduler cuboidScheduler = null; protected int nRowKey; @@ -84,8 +87,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB tmpbuf = ByteBuffer.allocate(4096); collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { - samplingPercentage = Integer - .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); cuboidScheduler = new CuboidScheduler(cubeDesc); nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; @@ -101,6 +103,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); if (partitionColRef != null) { partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); @@ -126,9 +129,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB isUsePutRowKeyToHllNewAlgorithm = true; rowHashCodesLong = new long[nRowKey]; hf = Hashing.murmur3_128(); - logger.info( - "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", - cubeDesc.getVersion()); + logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion()); } } @@ -159,7 +160,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); - for (String[] row : rowCollection) { + for (String[] row: rowCollection) { context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); for (int i = 0; i < factDictCols.size(); i++) { String fieldValue = row[dictionaryColumnIndex[i]]; @@ -172,8 +173,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB reducerIndex = columnIndexToReducerBeginId.get(i); } else { //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) - + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; } tmpbuf.clear(); @@ -192,8 +192,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB // log a few rows for troubleshooting if (rowCount < 10) { - logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " - + reducerIndex); + logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); } } @@ -302,6 +301,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } } + private int countNewSize(int oldSize, int dataSize) { int newSize = oldSize * 2; while (newSize < dataSize) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 2e55a52..458af69 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -43,8 +43,7 @@ import org.apache.kylin.metadata.model.TblColRef; /** */ -abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> - extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> { +abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> { protected String cubeName; protected CubeInstance cube; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index f5f03e2..7f01c3a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -51,7 +52,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -112,8 +112,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK isStatistics = true; baseCuboidRowCountInMappers = Lists.newArrayList(); cuboidHLLMap = Maps.newHashMap(); - samplingPercentage = Integer - .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); logger.info("Reducer " + taskId + " handling stats"); } else if (collectStatistics && (taskId == numberOfTasks - 2)) { // partition col @@ -134,7 +133,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder buildDictInReducer = false; } - if (config.getUHCReducerCount() > 1) { + if(config.getUHCReducerCount() > 1) { int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); int colIndex = reducerIdToColumnIndex.get(taskId); if (uhcIndex[colIndex] == 1) @@ -163,8 +162,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } @Override - public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) - throws IOException, InterruptedException { + public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text key = skey.getText(); if (isStatistics) { // for hll @@ -245,12 +243,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK // output written to baseDir/colName/colName.pci-r-00000 (etc) String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; - mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), - partitionFileName); - mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), - partitionFileName); - logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" - + timeMaxValue); + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName); + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName); + logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue); } } @@ -258,13 +253,11 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK // output written to baseDir/colName/colName.rldict-r-00000 (etc) String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX; - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream outputStream = new DataOutputStream(baos);) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) { outputStream.writeUTF(dict.getClass().getName()); dict.write(outputStream); - mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), - dictFileName); + mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName); } } @@ -280,23 +273,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK grandTotal += hll.getCountEstimate(); } double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), - new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName); + mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName); // mapper number at key -2 - mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), - new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName); + mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName); // sampling percentage at key 0 - mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), - new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName); + mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName); for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); valueBuf.flip(); - mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), - new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName); + mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index d55b775..a04fb43 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -40,7 +40,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O @Override public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value); - for (String[] row : rowCollection) { + for (String[] row: rowCollection) { try { outputKV(row, context); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 7d8320a..73a2eb9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -111,8 +111,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { job.setOutputValueClass(Text.class); // set input - IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment) - .getFlatTableInputFormat(); + IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); // set output http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 65d1525..eee189c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -101,8 +101,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr cubeBuilder.setConcurrentThreads(taskCount); ExecutorService executorService = Executors.newSingleThreadExecutor(); - future = executorService.submit( - cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); + future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); } @@ -120,7 +119,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr // put each row to the queue Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); - for (String[] row : rowCollection) { + for(String[] row: rowCollection) { List<String> rowAsList = Arrays.asList(row); while (!future.isDone()) { if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { @@ -143,8 +142,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr try { future.get(); } catch (Exception e) { - throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), - e); + throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); } queue.clear(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index 65ba841..244889f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -74,8 +74,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra } @Override - public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) - throws IOException, InterruptedException { + public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException { aggs.reset(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index c1a55da..d183f90 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -31,8 +31,7 @@ public class MapContextGTRecordWriter extends KVGTRecordWriter { protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext; - public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, - CubeDesc cubeDesc, CubeSegment cubeSegment) { + public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) { super(cubeDesc, cubeSegment); this.mapContext = mapContext; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 7c50f23..a603fc8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -110,8 +110,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { // decide which source segment FileSplit fileSplit = (FileSplit) context.getInputSplit(); - IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment) - .getOuputFormat(); + IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat(); sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube); rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); @@ -185,8 +184,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, - splittedByteses[useSplit].length); + int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length); int idInMergedDict; //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); @@ -207,8 +205,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, - splittedByteses[useSplit].length); + System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length); bufOffset += splittedByteses[useSplit].length; } } @@ -242,14 +239,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { Boolean ret = dimensionsNeedDict.get(col); if (ret != null) return ret; - + ret = cubeDesc.getRowkey().isUseDictionary(col); if (ret) { - TableRef srcTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), col) - .getTableRef(); + TableRef srcTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), col).getTableRef(); ret = cubeDesc.getModel().isFactTable(srcTable); } - + dimensionsNeedDict.put(col, ret); return ret; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index f6658c8..4ca132c 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -103,8 +103,7 @@ public class MergeDictionaryStep extends AbstractExecutable { * @param newSeg * @throws IOException */ - private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, - List<CubeSegment> mergingSegments) throws IOException { + private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException { HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>(); HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>(); DictionaryManager dictMgr = DictionaryManager.getInstance(conf); @@ -143,8 +142,7 @@ public class MergeDictionaryStep extends AbstractExecutable { } } - private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, - TblColRef col) throws IOException { + private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException { DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts); if (dictInfo != null) cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 17c4d03..04d8231 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -76,8 +76,7 @@ public class MergeStatisticsStep extends AbstractExecutable { int averageSamplingPercentage = 0; for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) { - String fileKey = CubeSegment - .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId); + String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId); InputStream is = rs.getResource(fileKey).inputStream; File tempFile = null; FileOutputStream tempFileStream = null; @@ -121,13 +120,9 @@ public class MergeStatisticsStep extends AbstractExecutable { tempFile.delete(); } } - averageSamplingPercentage = averageSamplingPercentage - / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); - CubeStatsWriter.writeCuboidStatistics(conf, - new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, - averageSamplingPercentage); - Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), - BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); + CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); + Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf); FSDataInputStream is = fs.open(statisticsFilePath); try { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java index 9b41a8e..eee2c00 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java @@ -47,8 +47,7 @@ import com.google.common.collect.Sets; public class MetadataCleanupJob extends AbstractHadoopJob { @SuppressWarnings("static-access") - private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false) - .withDescription("Delete the unused metadata").create("delete"); + private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete"); protected static final Logger logger = LoggerFactory.getLogger(MetadataCleanupJob.class); @@ -101,8 +100,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { List<String> toDeleteResource = Lists.newArrayList(); // two level resources, snapshot tables and cube statistics - for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, - ResourceStore.CUBE_STATISTICS_ROOT }) { + for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, ResourceStore.CUBE_STATISTICS_ROOT }) { NavigableSet<String> snapshotTables = getStore().listResources(resourceRoot); if (snapshotTables != null) { @@ -151,9 +149,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { for (ExecutablePO executable : allExecutable) { long lastModified = executable.getLastModified(); ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid()); - if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB - && (ExecutableState.SUCCEED.toString().equals(output.getStatus()) - || ExecutableState.DISCARDED.toString().equals(output.getStatus()))) { + if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (ExecutableState.SUCCEED.toString().equals(output.getStatus()) || ExecutableState.DISCARDED.toString().equals(output.getStatus()))) { toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid()); toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid()); @@ -164,8 +160,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { } if (toDeleteResource.size() > 0) { - logger.info( - "The following resources have no reference or is too old, will be cleaned from metadata store: \n"); + logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n"); for (String s : toDeleteResource) { logger.info(s); @@ -180,8 +175,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { } public static void main(String[] args) throws Exception { - logger.warn( - "org.apache.kylin.engine.mr.steps.MetadataCleanupJob is deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead"); + logger.warn("org.apache.kylin.engine.mr.steps.MetadataCleanupJob is deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead"); int exitCode = ToolRunner.run(new MetadataCleanupJob(), args); System.exit(exitCode); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 8bf6d4b..b924edc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -18,9 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Collection; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; @@ -39,6 +36,9 @@ import org.apache.kylin.engine.mr.common.NDCuboidBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; + /** * @author George Song (ysong1) * @@ -79,6 +79,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } + + @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidId = rowKeySplitter.split(key.getBytes()); @@ -103,8 +105,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, - rowKeySplitter.getSplitBuffers()); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); outputKey.set(result.getSecond().array(), 0, result.getFirst()); context.write(outputKey, value); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java index 8acd499..5c0555a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java @@ -34,15 +34,13 @@ public class ReducerNumSizing { private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class); - public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) - throws ClassNotFoundException, IOException, InterruptedException, JobException { + public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { CubeDesc cubeDesc = cubeSegment.getCubeDesc(); KylinConfig kylinConfig = cubeDesc.getConfig(); double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " - + level); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); @@ -52,8 +50,7 @@ public class ReducerNumSizing { //merge case double estimatedSize = cubeStatsReader.estimateCubeSize(); adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; - logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, - totalMapInputMB, adjustedCurrentLayerSizeEst); + logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst); } else if (level == 0) { //base cuboid case TODO: the estimation could be very WRONG because it has no correction adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); @@ -62,9 +59,7 @@ public class ReducerNumSizing { parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; - logger.debug( - "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", - totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + logger.debug("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); } // number of reduce tasks http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java index 89534fe..3419949 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java @@ -39,8 +39,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; public class RowKeyDistributionCheckerJob extends AbstractHadoopJob { @SuppressWarnings("static-access") - protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true) - .withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath"); + protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath"); @Override public int run(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java index 0af1b85..eab57d1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java @@ -68,7 +68,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex for (Text t : keyList) { if (key.compareTo(t) < 0) { Long v = resultMap.get(t); - long length = (long) key.getLength() + value.getLength(); + long length = (long)key.getLength() + value.getLength(); v += length; resultMap.put(t, v); break; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java index 1aa406f..d203e8c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java @@ -38,8 +38,7 @@ public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWri } @Override - public void doReduce(Text key, Iterable<LongWritable> values, Context context) - throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long length = 0; for (LongWritable v : values) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 859cd2e..28f99fb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -54,17 +54,14 @@ public class SaveStatisticsStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - CubeSegment newSegment = CubingExecutableUtil.findSegment(context, - CubingExecutableUtil.getCubeName(this.getParams()), - CubingExecutableUtil.getSegmentId(this.getParams())); + CubeSegment newSegment = CubingExecutableUtil.findSegment(context, CubingExecutableUtil.getCubeName(this.getParams()), CubingExecutableUtil.getSegmentId(this.getParams())); KylinConfig kylinConf = newSegment.getConfig(); ResourceStore rs = ResourceStore.getStore(kylinConf); try { FileSystem fs = HadoopUtil.getWorkingFileSystem(); Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams())); - Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, - BatchConstants.CFG_OUTPUT_STATISTICS); + Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS); if (statisticsFilePath == null) { throw new IOException("fail to find the statistics file in base dir: " + statisticsDir); } @@ -114,8 +111,7 @@ public class SaveStatisticsStep extends AbstractExecutable { double mapperOverlapRatio = cubeStats.getMapperOverlapRatioOfFirstBuild(); double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); - logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " - + overlapThreshold); + logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold); // in-mem cubing is good when // 1) the cluster has enough mapper slots to run in parallel http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java index bb4152e..c75abea 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java @@ -28,7 +28,9 @@ import org.apache.kylin.metadata.datatype.DataType; public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> { public enum TypeFlag { - NONE_NUMERIC_TYPE, INTEGER_FAMILY_TYPE, DOUBLE_FAMILY_TYPE + NONE_NUMERIC_TYPE, + INTEGER_FAMILY_TYPE, + DOUBLE_FAMILY_TYPE } private byte typeId; //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010) @@ -59,6 +61,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta } } + public void init(Text key, DataType type) { init(key, getTypeIdByDatatype(type)); } @@ -110,6 +113,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta return (typeId == TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); } + public byte getTypeIdByDatatype(DataType type) { if (!type.isNumberFamily()) { return (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal(); @@ -125,3 +129,5 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta } } + + http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 32eaebd..2efd718 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -88,8 +88,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity()); FileSystem fs = HadoopUtil.getWorkingFileSystem(); - Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, - partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); + Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); if (outputFile == null) { throw new IOException("fail to find the partition file in base dir: " + colDir); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 8447e44..add5c42 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -51,8 +51,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); if (mergedSegment == null) { - return new ExecuteResult(ExecuteResult.State.FAILED, - "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())); + return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())); } CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java index 1ad6687..39c5bac 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java @@ -297,7 +297,7 @@ public class SortedColumnReaderTest { } return result; } - + private String qualify(String path) { String absolutePath = new File(path).getAbsolutePath(); if (absolutePath.startsWith("/")) http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java index e1adbb3..4c43dbc 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java @@ -35,11 +35,9 @@ public class TableReaderTest { @Test public void testBasicReader() throws IOException { File f = new File("src/test/resources/dict/DW_SITES"); - DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO, - 10); + DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO, 10); while (reader.next()) { - assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", - Arrays.toString(reader.getRow())); + assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow())); break; } reader.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index 32e80fc..7616df2 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -77,8 +77,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_ready"); - CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready") - .getDescriptor(); + CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor(); BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); Text key1 = new Text("72010ustech"); @@ -102,12 +101,9 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { List<Pair<Text, Text>> result = reduceDriver.run(); - Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), - newValueText(codec, "45.43", "10", "20.34", 3, 600)); - Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), - newValueText(codec, "35.43", "15.09", "20.34", 2, 1500)); - Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"), - newValueText(codec, "146.52", "146.52", "146.52", 0, 0)); + Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), newValueText(codec, "45.43", "10", "20.34", 3, 600)); + Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), newValueText(codec, "35.43", "15.09", "20.34", 2, 1500)); + Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"), newValueText(codec, "146.52", "146.52", "146.52", 0, 0)); assertEquals(3, result.size()); @@ -121,8 +117,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_ready"); reduceDriver.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 1); - CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready") - .getDescriptor(); + CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor(); MeasureDesc measureDesc = cubeDesc.getMeasures().get(0); FunctionDesc functionDesc = measureDesc.getFunction(); Field field = FunctionDesc.class.getDeclaredField("measureType"); @@ -153,10 +148,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { List<Pair<Text, Text>> result = reduceDriver.run(); - Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), - newValueText(codec, "0", "10", "20.34", 3, 600)); - Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), - newValueText(codec, "0", "15.09", "20.34", 2, 1500)); + Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), newValueText(codec, "0", "10", "20.34", 3, 600)); + Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), newValueText(codec, "0", "15.09", "20.34", 2, 1500)); Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"), newValueText(codec, "0", "146.52", "146.52", 0, 0)); assertEquals(3, result.size()); @@ -166,10 +159,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { assertTrue(result.contains(p3)); } - private Text newValueText(BufferedMeasureCodec codec, String sum, String min, String max, int count, - int item_count) { - Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new Long(count), - new Long(item_count) }; + private Text newValueText(BufferedMeasureCodec codec, String sum, String min, String max, int count, int item_count) { + Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new Long(count), new Long(item_count) }; ByteBuffer buf = codec.encode(values); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java index 63e09ac..2e2ebf9 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java @@ -80,8 +80,7 @@ public class MergeCuboidJobTest extends LocalFileMetadataTestCase { // CubeManager cubeManager = // CubeManager.getInstanceFromEnv(getTestConfig()); - String[] args = { "-input", baseFolder.getAbsolutePath() + "," + eightFoler.getAbsolutePath(), "-cubename", - cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname }; + String[] args = { "-input", baseFolder.getAbsolutePath() + "," + eightFoler.getAbsolutePath(), "-cubename", cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname }; assertEquals("Job failed", 0, ToolRunner.run(conf, new MergeCuboidJob(), args)); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index f73c645..04af4fe 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("rawtypes") public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { - + private static final Logger logger = LoggerFactory.getLogger(MergeCuboidMapperTest.class); MapDriver<Text, Text, Text, Text> mapDriver; @@ -75,8 +75,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { List<String> values = new ArrayList<>(); values.add("eee"); values.add("fff"); - Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), - new IterableDictionaryValueEnumerator(values)); + Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); dict.dump(System.out); @@ -128,8 +127,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { values.add("ccc"); else values.add("bbb"); - Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), - new IterableDictionaryValueEnumerator(values)); + Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); dict.dump(System.out); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java index e15d463..989ed72 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java @@ -66,8 +66,7 @@ public class NDCuboidJobTest extends LocalFileMetadataTestCase { FileUtil.fullyDelete(new File(output)); - String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, - "-jobname", jobname, "-level", level }; + String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level }; assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args)); } @@ -82,8 +81,7 @@ public class NDCuboidJobTest extends LocalFileMetadataTestCase { FileUtil.fullyDelete(new File(output)); - String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, - "-jobname", jobname, "-level", level }; + String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level }; assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java index 3ee49f2..c0ce1a4 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java @@ -76,10 +76,8 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); - byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, - 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; - byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, - 56, 92, 114, -80, 118, 1, 1 }; + byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value)); mapReduceDriver.addInput(input1); @@ -88,10 +86,8 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { assertEquals(4, result.size()); - byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0, -104, -106, -128, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, - 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; - byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, - -16, 56, 92, 114, -80, 118, 1, 1 }; + byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0, -104, -106, -128, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue)); //As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue @@ -110,8 +106,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 })); for (int i = 0; i < result.size(); i++) { byte[] bytes = new byte[result.get(i).getFirst().getLength()]; - System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, - result.get(i).getFirst().getLength() - RowConstants.ROWKEY_SHARDID_LEN); + System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, result.get(i).getFirst().getLength() - RowConstants.ROWKEY_SHARDID_LEN); System.out.println(Bytes.toLong(bytes)); keySet[i] = Bytes.toLong(bytes); }