KYLIN-2242 write multiple files in FactDistinctColumnsReducer with 
MultipleOutputs


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7de8aa12
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7de8aa12
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7de8aa12

Branch: refs/heads/master-cdh5.7
Commit: 7de8aa1203a72bad105ed692f7100535939b03af
Parents: c2229c9
Author: kangkaisen <kangkai...@live.com>
Authored: Sat Dec 17 14:12:48 2016 +0800
Committer: kangkaisen <kangkai...@163.com>
Committed: Sat Jan 21 23:19:50 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/HadoopUtil.java    |  16 ++
 .../kylin/engine/mr/JobBuilderSupport.java      |   2 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   9 +-
 .../engine/mr/steps/CreateDictionaryJob.java    |  43 ++--
 .../engine/mr/steps/FactDistinctColumnsJob.java |  32 ++-
 .../mr/steps/FactDistinctColumnsReducer.java    | 240 +++++++------------
 .../engine/mr/steps/SaveStatisticsStep.java     |  10 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  10 +-
 8 files changed, 175 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index bdc4c3e..b9ffe38 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -26,8 +26,10 @@ import java.net.URISyntaxException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
@@ -140,4 +142,18 @@ public class HadoopUtil {
         }
     }
 
+    public static Path getFilterOnlyPath(FileSystem fs, Path baseDir, final 
String filter) throws IOException {
+        FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(filter);
+            }
+        });
+
+        if (fileStatus.length == 1) {
+            return fileStatus[0].getPath();
+        } else {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 696b22a..c34a904 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -171,7 +171,7 @@ public class JobBuilderSupport {
     }
 
     public String getStatisticsPath(String jobId) {
-        return getRealizationRootPath(jobId) + "/statistics";
+        return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + 
BatchConstants.CFG_OUTPUT_STATISTICS;
     }
 
     // 
============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 0281539..602b4bb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -53,9 +53,16 @@ public interface BatchConstants {
     String CFG_STATISTICS_ENABLED = "statistics.enabled";
     String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for 
compatibility issue better not change it
     String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
-    String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
     String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
 
+    String CFG_MAPRED_OUTPUT_COMPRESS = "mapred.output.compress";
+
+    String CFG_OUTPUT_COLUMN = "column";
+    String CFG_OUTPUT_DICT = "dict";
+    String CFG_OUTPUT_STATISTICS = "statistics";
+    String CFG_OUTPUT_PARTITION = "partition";
+
+
     /**
      * command line ARGuments
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 95d8cb1..e5d053b 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -18,15 +18,20 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ByteBufferBackedInputStream;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -63,21 +68,27 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
             @Override
             public Dictionary<String> getDictionary(TblColRef col) throws 
IOException {
-                Path dictFile = new Path(factColumnsInputPath, 
col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
-                FileSystem fs = HadoopUtil.getWorkingFileSystem();
-                if (fs.exists(dictFile) == false)
+                Path colDir = new Path(factColumnsInputPath, col.getName());
+                FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
+
+                Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, 
col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+                if (dictFile == null) {
                     return null;
-                
-                FSDataInputStream is = null;
-                try {
-                    is = fs.open(dictFile);
-                    String dictClassName = is.readUTF();
-                    Dictionary<String> dict = (Dictionary<String>) 
ClassUtil.newInstance(dictClassName);
-                    dict.readFields(is);
-                    logger.info("DictionaryProvider read dict from file: " + 
dictFile);
-                    return dict;
-                } finally {
-                    IOUtils.closeQuietly(is);
+                }
+
+                try (SequenceFile.Reader reader = new 
SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), 
SequenceFile.Reader.file(dictFile))) {
+                    NullWritable key = NullWritable.get();
+                    BytesWritable value = new BytesWritable();
+                    reader.next(key, value);
+
+                    ByteBuffer buffer = new 
ByteArray(value.getBytes()).asBuffer();
+                    try (DataInputStream is = new DataInputStream(new 
ByteBufferBackedInputStream(buffer))) {
+                        String dictClassName = is.readUTF();
+                        Dictionary<String> dict = (Dictionary<String>) 
ClassUtil.newInstance(dictClassName);
+                        dict.readFields(is);
+                        logger.info("DictionaryProvider read dict from file: " 
+ dictFile);
+                        return dict;
+                    }
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index ce01eb6..aded600 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -23,11 +23,16 @@ import java.util.List;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -82,8 +87,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob 
{
             int uhcReducerCount = cube.getConfig().getUHCReducerCount();
 
             int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-            for(int index : uhcIndex) {
-                if(index == 1) {
+            for (int index : uhcIndex) {
+                if (index == 1) {
                     reducerCount += uhcReducerCount - 1;
                 }
             }
@@ -92,7 +97,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob 
{
                 throw new IllegalArgumentException("The max reducer number for 
FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 
'kylin.engine.mr.uhc-reducer-count'");
             }
 
-
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, 
statistics_enabled);
@@ -117,6 +121,12 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
 
             attachCubeMetadata(cube, job.getConfiguration());
 
+            /**
+             * don't compress the reducer output so that {@link 
CreateDictionaryJob} and {@link UpdateCubeInfoAfterBuildStep}
+             * could read the reducer file directly
+             */
+            
job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
+
             return waitForCompletion(job);
 
         } finally {
@@ -138,18 +148,22 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
 
     private void setupReducer(Path output, int numberOfReducers) throws 
IOException {
         job.setReducerClass(FactDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
 
-        // important, reducer writes HDFS directly at the moment
-        job.setReduceSpeculativeExecution(false);
-        
+        //make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, 
SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, 
SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, 
LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, 
NullWritable.class, LongWritable.class);
+
+
         FileOutputFormat.setOutputPath(job, output);
         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());
 
+        //prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+
         deletePath(job.getConfiguration(), output);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 711d991..5d2fb72 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -18,27 +18,25 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -47,7 +45,7 @@ import org.apache.kylin.dict.IDictionaryBuilder;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -63,14 +61,12 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     private List<TblColRef> columnList;
-    private String statisticsOutput = null;
     private List<Long> baseCuboidRowCountInMappers;
     protected Map<Long, HLLCounter> cuboidHLLMap = null;
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
     private long totalRowsBeforeMerge = 0;
     private int samplingPercentage;
-    private List<ByteArray> colValues;
     private TblColRef col = null;
     private boolean isStatistics = false;
     private KylinConfig cubeConfig;
@@ -88,10 +84,14 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     public static final String DICT_FILE_POSTFIX = ".rldict";
     public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
 
+    private MultipleOutputs mos;
+
     @Override
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
         Configuration conf = context.getConfiguration();
+        mos = new MultipleOutputs(context);
+
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
@@ -109,26 +109,20 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
             isStatistics = true;
-            statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
             samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             logger.info("Reducer " + taskId + " handling stats");
         } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
             // partition col
-            isStatistics = false;
             isPartitionCol = true;
             col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             if (col == null) {
                 logger.info("Do not have partition col. This reducer will keep 
empty");
             }
-            colValues = Lists.newLinkedList();
-            logger.info("Reducer " + taskId + " handling partition column " + 
col);
         } else {
             // normal col
-            isStatistics = false;
             col = columnList.get(reducerIdToColumnIndex.get(taskId));
-            colValues = Lists.newLinkedList();
 
             // local build dict
             isReducerLocalBuildDict = config.isReducerLocalBuildDict();
@@ -194,15 +188,13 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
                 logAFewRows(value);
                 builder.addValue(value);
             } else {
-                colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, 
key.getLength() - 1)));
-                if (colValues.size() == 1000000) { //spill every 1 million
-                    logger.info("spill values to disk...");
-                    outputDistinctValues(col, colValues, context);
-                    colValues.clear();
-                }
+                byte[] keyBytes = Bytes.copy(key.getBytes(), 1, 
key.getLength() - 1);
+                // output written to baseDir/colName/-r-00000 (etc)
+                String fileName = col.getName() + "/";
+                mos.write(BatchConstants.CFG_OUTPUT_COLUMN, 
NullWritable.get(), new Text(keyBytes), fileName);
             }
         }
-        
+
         rowCount++;
     }
 
@@ -212,162 +204,104 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         }
     }
 
-    private void outputDistinctValues(TblColRef col, Collection<ByteArray> 
values, Context context) throws IOException {
-        final Configuration conf = context.getConfiguration();
-        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
-        final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path colDir = new Path(outputPath, col.getIdentity());
-        final String fileName = col.getIdentity() + "-" + taskId % 
uhcReducerCount;
-        final Path outputFile = new Path(colDir, fileName);
-
-        FSDataOutputStream out = null;
-        try {
-            if (!fs.exists(colDir)) {
-                fs.mkdirs(colDir);
-            }
-
-            if (fs.exists(outputFile)) {
-                out = fs.append(outputFile);
-                logger.info("append file " + outputFile);
-            } else {
-                out = fs.create(outputFile);
-                logger.info("create file " + outputFile);
-            }
-
-            for (ByteArray value : values) {
-                out.write(value.array(), value.offset(), value.length());
-                out.write('\n');
-            }
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    private void outputDict(TblColRef col, Dictionary<String> dict, Context 
context) throws IOException {
-        final String fileName = col.getIdentity() + DICT_FILE_POSTFIX;
-        FSDataOutputStream out = getOutputStream(context, fileName);
-        try {
-            String dictClassName = dict.getClass().getName();
-            out.writeUTF(dictClassName);
-            dict.write(out);
-            logger.info("reducer id is:+" + taskId + " colName:" + 
col.getName() + "  writing dict at file : " + fileName + "  dict class:" + 
dictClassName);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    private void outputPartitionInfo(Context context) throws IOException {
-        final String fileName = col.getIdentity() + 
PARTITION_COL_INFO_FILE_POSTFIX;
-        FSDataOutputStream out = getOutputStream(context, fileName);
-        try {
-            out.writeLong(timeMinValue);
-            out.writeLong(timeMaxValue);
-            logger.info("write partition info for col : " + col.getName() + "  
minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    private FSDataOutputStream getOutputStream(Context context, String 
outputFileName) throws IOException {
-        final Configuration conf = context.getConfiguration();
-        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
-        final Path outputPath = new 
Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
-        final Path outputFile = new Path(outputPath, outputFileName);
-        if (!fs.exists(outputPath)) {
-            fs.mkdirs(outputPath);
-        }
-        FSDataOutputStream out = fs.create(outputFile);
-        return out;
-    }
-
     @Override
     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
         if (isStatistics) {
-            // output the hll info
-            long grandTotal = 0;
-            for (HLLCounter hll : cuboidHLLMap.values()) {
-                grandTotal += hll.getCountEstimate();
-            }
-            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
-
-            int mapperNumber = baseCuboidRowCountInMappers.size();
+            //output the hll info;
+            List<Long> allCuboids = Lists.newArrayList();
+            allCuboids.addAll(cuboidHLLMap.keySet());
+            Collections.sort(allCuboids);
 
-            writeMapperAndCuboidStatistics(context); // for human check
-            CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), 
new Path(statisticsOutput), //
-                    cuboidHLLMap, samplingPercentage, mapperNumber, 
mapperOverlapRatio);
+            logMapperAndCuboidStatistics(allCuboids); // for human check
+            outputStatistics(allCuboids);
         } else if (isPartitionCol) {
             // partition col
-            if (col != null) {
-                outputPartitionInfo(context);
-            }
+            outputPartitionInfo();
         } else {
             // normal col
             if (isReducerLocalBuildDict) {
                 Dictionary<String> dict = builder.build();
-                outputDict(col, dict, context);
-            } else {
-                if (colValues.size() > 0) {
-                    outputDistinctValues(col, colValues, context);
-                    colValues.clear();
-                }
+                outputDict(col, dict);
             }
         }
+
+        mos.close();
     }
 
-    private void writeMapperAndCuboidStatistics(Context context) throws 
IOException {
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
-        Path path = new Path(statisticsOutput, 
BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME);
-        FSDataOutputStream out = fs.create(path);
+    private void outputPartitionInfo() throws IOException, 
InterruptedException {
+        if (col != null) {
+            // output written to baseDir/colName/colName.pci-r-00000 (etc)
+            String partitionFileName = col.getName() + "/" + col.getName() + 
PARTITION_COL_INFO_FILE_POSTFIX;
 
-        try {
-            String msg;
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), 
new LongWritable(timeMinValue), partitionFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), 
new LongWritable(timeMaxValue), partitionFileName);
+            logger.info("write partition info for col : " + col.getName() + "  
minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+        }
+    }
 
-            List<Long> allCuboids = Lists.newArrayList();
-            allCuboids.addAll(cuboidHLLMap.keySet());
-            Collections.sort(allCuboids);
+    private void outputDict(TblColRef col, Dictionary<String> dict) throws 
IOException, InterruptedException {
+        // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+        String dictFileName = col.getName() + "/" + col.getName() + 
DICT_FILE_POSTFIX;
 
-            msg = "Total cuboid number: \t" + allCuboids.size();
-            writeLine(out, msg);
-            msg = "Samping percentage: \t" + samplingPercentage;
-            writeLine(out, msg);
-
-            writeLine(out, "The following statistics are collected based on 
sampling data.");
-            writeLine(out, "Number of Mappers: " + 
baseCuboidRowCountInMappers.size());
-            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
-                if (baseCuboidRowCountInMappers.get(i) > 0) {
-                    msg = "Base Cuboid in Mapper " + i + " row count: \t " + 
baseCuboidRowCountInMappers.get(i);
-                    writeLine(out, msg);
-                }
-            }
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DataOutputStream outputStream = new DataOutputStream(baos);) {
+            outputStream.writeUTF(dict.getClass().getName());
+            dict.write(outputStream);
 
-            long grantTotal = 0;
-            for (long i : allCuboids) {
-                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
-                msg = "Cuboid " + i + " row count is: \t " + 
cuboidHLLMap.get(i).getCountEstimate();
-                writeLine(out, msg);
-            }
+            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new 
BytesWritable(baos.toByteArray()), dictFileName);
+        }
+    }
 
-            msg = "Sum of all the cube segments (before merge) is: \t " + 
totalRowsBeforeMerge;
-            writeLine(out, msg);
+    private void outputStatistics(List<Long> allCuboids) throws IOException, 
InterruptedException {
+        // output written to baseDir/statistics/statistics-r-00000 (etc)
+        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" 
+ BatchConstants.CFG_OUTPUT_STATISTICS;
 
-            msg = "After merge, the cube has row count: \t " + grantTotal;
-            writeLine(out, msg);
+        ByteBuffer valueBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
 
-            if (grantTotal > 0) {
-                msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge 
/ grantTotal;
-                writeLine(out, msg);
-            }
+        // mapper overlap ratio at key -1
+        long grandTotal = 0;
+        for (HLLCounter hll : cuboidHLLMap.values()) {
+            grandTotal += hll.getCountEstimate();
+        }
+        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
+        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), 
new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+
+        // mapper number at key -2
+        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), 
new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), 
statisticsFileName);
+
+        // sampling percentage at key 0
+        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), 
new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
 
-        } finally {
-            IOUtils.closeQuietly(out);
+        for (long i : allCuboids) {
+            valueBuf.clear();
+            cuboidHLLMap.get(i).writeRegisters(valueBuf);
+            valueBuf.flip();
+            mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new 
LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), 
statisticsFileName);
         }
     }
 
-    private void writeLine(FSDataOutputStream out, String msg) throws 
IOException {
-        out.write(msg.getBytes());
-        out.write('\n');
+    private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws 
IOException {
+        logger.info("Total cuboid number: \t" + allCuboids.size());
+        logger.info("Samping percentage: \t" + samplingPercentage);
+        logger.info("The following statistics are collected based on sampling 
data.");
+        logger.info("Number of Mappers: " + 
baseCuboidRowCountInMappers.size());
 
+        for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+            if (baseCuboidRowCountInMappers.get(i) > 0) {
+                logger.info("Base Cuboid in Mapper " + i + " row count: \t " + 
baseCuboidRowCountInMappers.get(i));
+            }
+        }
+
+        long grantTotal = 0;
+        for (long i : allCuboids) {
+            grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+            logger.info("Cuboid " + i + " row count is: \t " + 
cuboidHLLMap.get(i).getCountEstimate());
+        }
+
+        logger.info("Sum of all the cube segments (before merge) is: \t " + 
totalRowsBeforeMerge);
+        logger.info("After merge, the cube has row count: \t " + grantTotal);
+        if (grantTotal > 0) {
+            logger.info("The mapper overlap ratio is: \t" + 
totalRowsBeforeMerge / grantTotal);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 2671042..28f99fb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -60,9 +60,11 @@ public class SaveStatisticsStep extends AbstractExecutable {
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            Path statisticsFilePath = new 
Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
-            if (!fs.exists(statisticsFilePath))
-                throw new IOException("File " + statisticsFilePath + " does 
not exists");
+            Path statisticsDir = new 
Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
+            Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, 
statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
+            if (statisticsFilePath == null) {
+                throw new IOException("fail to find the statistics file in 
base dir: " + statisticsDir);
+            }
 
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
@@ -110,7 +112,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
                 double overlapThreshold = 
kylinConf.getCubeAlgorithmAutoThreshold();
                 logger.info("mapperNumber for " + seg + " is " + mapperNumber 
+ " and threshold is " + mapperNumLimit);
                 logger.info("mapperOverlapRatio for " + seg + " is " + 
mapperOverlapRatio + " and threshold is " + overlapThreshold);
- 
+
                 // in-mem cubing is good when
                 // 1) the cluster has enough mapper slots to run in parallel
                 // 2) the mapper overlap ratio is small, meaning the shuffle 
of in-mem MR has advantage

http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index dc80399..81d5c42 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -80,8 +80,13 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = 
segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
         final String factColumnsInputPath = 
this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path outputFile = new Path(factColumnsInputPath, 
partitionCol.getIdentity() + 
FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
-        FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
+        Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
+        FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
+        Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, 
partitionCol.getName() + 
FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+        if (outputFile == null) {
+            throw new IOException("fail to find the partition file in base 
dir: " + colDir);
+        }
+
         FSDataInputStream is = null;
         long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
         try {
@@ -97,5 +102,4 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
         segment.setDateRangeStart(minValue);
         segment.setDateRangeEnd(maxValue);
     }
-
 }

Reply via email to