Repository: kylin
Updated Branches:
  refs/heads/master af3e4fece -> 827205f17


KYLIN-2202 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/827205f1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/827205f1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/827205f1

Branch: refs/heads/master
Commit: 827205f17b61d1e1ac9d6af580e04583c1079003
Parents: af3e4fe
Author: Li Yang <liy...@apache.org>
Authored: Thu Nov 24 14:52:33 2016 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Thu Nov 24 15:18:19 2016 +0800

----------------------------------------------------------------------
 .../validation/rule/DictionaryRuleTest.java     |   1 -
 .../apache/kylin/dict/DictionaryGenerator.java  |   7 +-
 .../apache/kylin/dict/DictionaryManager.java    |   4 -
 .../dict/NumberDictionaryForestBuilder.java     |   4 -
 .../kylin/dict/TrieDictionaryForestBuilder.java |   3 +-
 .../apache/kylin/engine/mr/DFSFileTable.java    |  35 ---
 .../engine/mr/DFSSingleFileTableReader.java     | 218 -------------------
 .../apache/kylin/engine/mr/SortedColumn.java    | 100 ---------
 .../kylin/engine/mr/SortedColumnDFSFile.java    | 125 +++++++++++
 .../engine/mr/SortedColumnDFSFileReader.java    | 136 ++++++++++++
 .../kylin/engine/mr/SortedColumnReader.java     | 136 ------------
 .../engine/mr/steps/CreateDictionaryJob.java    |   6 +-
 .../mr/steps/FactDistinctColumnsReducer.java    |   3 -
 .../mr/steps/FactDistinctHiveColumnsMapper.java |  12 +-
 .../engine/mr/steps/SelfDefineSortableKey.java  |  25 ++-
 .../apache/kylin/engine/mr/steps/TypeFlag.java  |  28 ---
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |   4 +-
 .../kylin/engine/mr/SortedColumnReaderTest.java | 167 +++++++-------
 .../mr/steps/NumberDictionaryForestTest.java    |   1 +
 .../mr/steps/SelfDefineSortableKeyTest.java     |  93 ++++----
 .../kylin/source/hive/HiveTableReader.java      |   3 -
 21 files changed, 412 insertions(+), 699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
 
b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 8bd4c88..9b37507 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -34,7 +34,6 @@ import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 20a57ba..810a392 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -135,8 +135,7 @@ public class DictionaryGenerator {
     private static class StringDictBuilder implements IDictionaryBuilder {
         @Override
         public Dictionary<String> build(DictionaryInfo dictInfo, 
IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, 
ArrayList<String> returnSamples) throws IOException {
-            int maxTrieSizeInMB = 
TrieDictionaryForestBuilder.getMaxTrieSizeInMB();
-            TrieDictionaryForestBuilder builder = new 
TrieDictionaryForestBuilder(new StringBytesConverter(), baseId, 
maxTrieSizeInMB);
+            TrieDictionaryForestBuilder builder = new 
TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
             String value;
             while (valueEnumerator.moveNext()) {
                 value = valueEnumerator.current();
@@ -153,9 +152,7 @@ public class DictionaryGenerator {
     private static class NumberDictBuilder implements IDictionaryBuilder {
         @Override
         public Dictionary<String> build(DictionaryInfo dictInfo, 
IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, 
ArrayList<String> returnSamples) throws IOException {
-
-            int maxTrieSizeInMB = 
TrieDictionaryForestBuilder.getMaxTrieSizeInMB();
-            NumberDictionaryForestBuilder builder = new 
NumberDictionaryForestBuilder(baseId, maxTrieSizeInMB);
+            NumberDictionaryForestBuilder builder = new 
NumberDictionaryForestBuilder(baseId);
             String value;
             while (valueEnumerator.moveNext()) {
                 value = valueEnumerator.current();

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0608e6f..c33cd28 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -442,10 +442,6 @@ public class DictionaryManager {
 
         logger.info("DictionaryManager(" + System.identityHashCode(this) + ") 
loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath);
         DictionaryInfo info = store.getResource(resourcePath, 
DictionaryInfo.class, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : 
DictionaryInfoSerializer.INFO_SERIALIZER);
-        //info.dictionaryObject.dump(System.out);
-        //        if (loadDictObj)
-        //            logger.debug("Loaded dictionary at " + resourcePath);
-
         return info;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
index a68b18e..5502a74 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
@@ -20,16 +20,12 @@ package org.apache.kylin.dict;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.dict.NumberDictionary.NumberBytesCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Created by xiefan on 16-11-2.
  */
 public class NumberDictionaryForestBuilder extends 
TrieDictionaryForestBuilder<String> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(NumberDictionaryForestBuilder.class);
-
     public static class Number2BytesConverter implements 
BytesConverter<String> {
 
         static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 
NumberDictionary.MAX_DIGITS_BEFORE_DECIMAL_POINT;

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
index 6aeb2ec..4ee30f0 100755
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
@@ -78,7 +78,6 @@ public class TrieDictionaryForestBuilder<T> {
         addValue(valueBytes);
     }
 
-
     private void addValue(byte[] valueBytes) {
         ByteArray valueByteArray = new ByteArray(valueBytes);
         if (previousValue != null && isOrdered) {
@@ -97,7 +96,7 @@ public class TrieDictionaryForestBuilder<T> {
         previousValue = valueByteArray;
         trieBuilder.addValue(valueBytes);
         curTreeSize += valueBytes.length;
-        
+
         if (curTreeSize >= maxTrieTreeSize && isOrdered) {
             TrieDictionary<T> tree = trieBuilder.build(0);
             addTree(tree);

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index 074b271..8c1f6bd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,7 +55,6 @@ public class DFSFileTable implements ReadableTable {
     }
 
     @Override
-    @Deprecated
     public TableReader getReader() throws IOException {
         return new DFSFileTableReader(path, delim, nColumns);
     }
@@ -72,29 +69,6 @@ public class DFSFileTable implements ReadableTable {
         }
     }
 
-    public Collection<TableReader> getReaders() {
-        ArrayList<TableReader> readers = new ArrayList<>();
-        try {
-            String filePath = HadoopUtil.fixWindowsPath(path);
-            FileSystem fs = HadoopUtil.getFileSystem(filePath);
-            ArrayList<FileStatus> allFiles = new ArrayList<>();
-            FileStatus status = fs.getFileStatus(new Path(filePath));
-            if (status.isFile()) {
-                allFiles.add(status);
-            } else {
-                FileStatus[] listStatus = fs.listStatus(new Path(filePath));
-                allFiles.addAll(Arrays.asList(listStatus));
-            }
-            for (FileStatus f : allFiles) {
-                DFSSingleFileTableReader reader = new 
DFSSingleFileTableReader(f.getPath().toString(), delim, nColumns);
-                readers.add(reader);
-            }
-        }catch (IOException e){
-            e.printStackTrace();
-        }
-        return readers;
-    }
-
     @Override
     public String toString() {
         return path;
@@ -123,13 +97,4 @@ public class DFSFileTable implements ReadableTable {
         return Pair.newPair(size, lastModified);
     }
 
-    private boolean isExceptionSayingNotSeqFile(IOException e) {
-        if (e.getMessage() != null && e.getMessage().contains("not a 
SequenceFile"))
-            return true;
-
-        if (e instanceof EOFException) // in case the file is very very small
-            return true;
-
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java
deleted file mode 100644
index 0a45188..0000000
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.source.ReadableTable.TableReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tables are typically CSV or SEQ file.
- * 
- * @author yangli9
- */
-public class DFSSingleFileTableReader implements TableReader {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DFSSingleFileTableReader.class);
-    private static final char CSV_QUOTE = '"';
-    private static final String[] DETECT_DELIMS = new String[] { "\177", "|", 
"\t", "," };
-
-    private String filePath;
-    private String delim;
-    private RowReader reader;
-
-    private String curLine;
-    private String[] curColumns;
-    private int expectedColumnNumber = -1; // helps delimiter detection
-
-    public DFSSingleFileTableReader(String filePath, int expectedColumnNumber) 
throws IOException {
-        this(filePath, DFSFileTable.DELIM_AUTO, expectedColumnNumber);
-    }
-
-    public DFSSingleFileTableReader(String filePath, String delim, int 
expectedColumnNumber) throws IOException {
-        filePath = HadoopUtil.fixWindowsPath(filePath);
-        this.filePath = filePath;
-        this.delim = delim;
-        this.expectedColumnNumber = expectedColumnNumber;
-
-        FileSystem fs = HadoopUtil.getFileSystem(filePath);
-
-        try {
-            this.reader = new 
SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
-        } catch (IOException e) {
-            if (isExceptionSayingNotSeqFile(e) == false)
-                throw e;
-
-            this.reader = new CsvRowReader(fs, filePath);
-        }
-    }
-
-    private boolean isExceptionSayingNotSeqFile(IOException e) {
-        if (e.getMessage() != null && e.getMessage().contains("not a 
SequenceFile"))
-            return true;
-
-        if (e instanceof EOFException) // in case the file is very very small
-            return true;
-
-        return false;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        curLine = reader.nextLine();
-        curColumns = null;
-        return curLine != null;
-    }
-
-    public String getLine() {
-        return curLine;
-    }
-
-    @Override
-    public String[] getRow() {
-        if (curColumns == null) {
-            if (DFSFileTable.DELIM_AUTO.equals(delim))
-                delim = autoDetectDelim(curLine);
-
-            if (delim == null)
-                curColumns = new String[] { curLine };
-            else
-                curColumns = split(curLine, delim);
-        }
-        return curColumns;
-    }
-
-    private String[] split(String line, String delim) {
-        // FIXME CVS line should be parsed considering escapes
-        String[] str = StringSplitter.split(line, delim);
-
-        // un-escape CSV
-        if (DFSFileTable.DELIM_COMMA.equals(delim)) {
-            for (int i = 0; i < str.length; i++) {
-                str[i] = unescapeCsv(str[i]);
-            }
-        }
-
-        return str;
-    }
-
-    private String unescapeCsv(String str) {
-        if (str == null || str.length() < 2)
-            return str;
-
-        str = StringEscapeUtils.unescapeCsv(str);
-
-        // unescapeCsv may not remove the outer most quotes
-        if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == 
CSV_QUOTE)
-            str = str.substring(1, str.length() - 1);
-
-        return str;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (reader != null)
-            reader.close();
-    }
-
-    private String autoDetectDelim(String line) {
-        if (expectedColumnNumber > 0) {
-            for (String delim : DETECT_DELIMS) {
-                if (StringSplitter.split(line, delim).length == 
expectedColumnNumber) {
-                    logger.info("Auto detect delim to be '" + delim + "', 
split line to " + expectedColumnNumber + " columns -- " + line);
-                    return delim;
-                }
-            }
-        }
-
-        logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as 
a single value, for " + filePath);
-        return null;
-    }
-
-    // 
============================================================================
-
-    private interface RowReader extends Closeable {
-        String nextLine() throws IOException; // return null on EOF
-    }
-
-    private class SeqRowReader implements RowReader {
-        Reader reader;
-        Writable key;
-        Text value;
-
-        SeqRowReader(Configuration hconf, FileSystem fs, String path) throws 
IOException {
-            reader = new Reader(hconf, Reader.file(new Path(path)));
-            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), 
hconf);
-            value = new Text();
-        }
-
-        @Override
-        public String nextLine() throws IOException {
-            boolean hasNext = reader.next(key, value);
-            if (hasNext)
-                return Bytes.toString(value.getBytes(), 0, value.getLength());
-            else
-                return null;
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-    }
-
-    private class CsvRowReader implements RowReader {
-        BufferedReader reader;
-
-        CsvRowReader(FileSystem fs, String path) throws IOException {
-            FSDataInputStream in = fs.open(new Path(path));
-            reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
-        }
-
-        @Override
-        public String nextLine() throws IOException {
-            return reader.readLine();
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java
deleted file mode 100644
index 46b13f4..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.dict.ByteComparator;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.source.ReadableTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-/**
- * Created by xiefan on 16-11-22.
- *
- * Read values from multi col files and ensure their order using a K-Way merge 
algorithm
- *
- * You need to ensure that values inside each file is sorted
- */
-public class SortedColumn implements ReadableTable {
-
-    private DFSFileTable dfsFileTable;
-
-    private String path;
-
-    private DataType dataType;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(SortedColumn.class);
-
-    public SortedColumn(String path, DataType dataType) {
-        this.dfsFileTable = new DFSFileTable(path, -1);
-        this.dataType = dataType;
-    }
-
-    @Override
-    public TableReader getReader() throws IOException {
-        final Comparator<String> comparator = getComparatorByType(dataType);
-        return new SortedColumnReader(dfsFileTable.getReaders(), comparator);
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        return dfsFileTable.getSignature();
-    }
-
-    private Comparator<String> getComparatorByType(DataType type) {
-        Comparator<String> comparator;
-        if (!type.isNumberFamily()) {
-            comparator = new ByteComparator<>(new StringBytesConverter());
-        } else if (type.isIntegerFamily()) {
-            comparator = new Comparator<String>() {
-                @Override
-                public int compare(String str1, String str2) {
-                    try {
-                        Long num1 = Long.parseLong(str1);
-                        Long num2 = Long.parseLong(str2);
-                        return num1.compareTo(num2);
-                    } catch (NumberFormatException e) {
-                        logger.error("NumberFormatException when parse integer 
family number.str1:" + str1 + " str2:" + str2);
-                        e.printStackTrace();
-                        return 0;
-                    }
-                }
-            };
-        } else {
-            comparator = new Comparator<String>() {
-                @Override
-                public int compare(String str1, String str2) {
-                    try {
-                        Double num1 = Double.parseDouble(str1);
-                        Double num2 = Double.parseDouble(str2);
-                        return num1.compareTo(num2);
-                    } catch (NumberFormatException e) {
-                        logger.error("NumberFormatException when parse doul 
family number.str1:" + str1 + " str2:" + str2);
-                        return 0;
-                    }
-                }
-            };
-        }
-        return comparator;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
new file mode 100644
index 0000000..d0ee3ee
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.dict.ByteComparator;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.source.ReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by xiefan on 16-11-22.
+ *
+ * Read values from multi col files and ensure their order using a K-Way merge 
algorithm
+ *
+ * You need to ensure that values inside each file is sorted
+ */
+public class SortedColumnDFSFile implements ReadableTable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SortedColumnDFSFile.class);
+
+    private String dfsPath;
+
+    private DFSFileTable dfsFileTable;
+
+    private DataType dataType;
+
+    public SortedColumnDFSFile(String path, DataType dataType) {
+        this.dfsPath = path;
+        this.dfsFileTable = new DFSFileTable(path, -1);
+        this.dataType = dataType;
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        final Comparator<String> comparator = getComparatorByType(dataType);
+
+        ArrayList<TableReader> readers = new ArrayList<>();
+        String filePath = HadoopUtil.fixWindowsPath(dfsPath);
+        FileSystem fs = HadoopUtil.getFileSystem(filePath);
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            for (FileStatus f : listStatus) {
+                if (f.isFile())
+                    allFiles.add(f);
+            }
+        }
+        for (FileStatus f : allFiles) {
+            DFSFileTableReader reader = new 
DFSFileTableReader(f.getPath().toString(), -1);
+            readers.add(reader);
+        }
+
+        return new SortedColumnDFSFileReader(readers, comparator);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        return dfsFileTable.getSignature();
+    }
+
+    private Comparator<String> getComparatorByType(DataType type) {
+        Comparator<String> comparator;
+        if (!type.isNumberFamily()) {
+            comparator = new ByteComparator<>(new StringBytesConverter());
+        } else if (type.isIntegerFamily()) {
+            comparator = new Comparator<String>() {
+                @Override
+                public int compare(String str1, String str2) {
+                    try {
+                        Long num1 = Long.parseLong(str1);
+                        Long num2 = Long.parseLong(str2);
+                        return num1.compareTo(num2);
+                    } catch (NumberFormatException e) {
+                        logger.error("NumberFormatException when parse integer 
family number.str1:" + str1 + " str2:" + str2);
+                        e.printStackTrace();
+                        return 0;
+                    }
+                }
+            };
+        } else {
+            comparator = new Comparator<String>() {
+                @Override
+                public int compare(String str1, String str2) {
+                    try {
+                        Double num1 = Double.parseDouble(str1);
+                        Double num2 = Double.parseDouble(str2);
+                        return num1.compareTo(num2);
+                    } catch (NumberFormatException e) {
+                        logger.error("NumberFormatException when parse doul 
family number.str1:" + str1 + " str2:" + str2);
+                        return 0;
+                    }
+                }
+            };
+        }
+        return comparator;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
new file mode 100644
index 0000000..77719ff
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.source.ReadableTable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+/**
+ * Created by xiefan on 16-11-22.
+ */
+public class SortedColumnDFSFileReader implements ReadableTable.TableReader {
+    private Collection<ReadableTable.TableReader> readers;
+
+    @SuppressWarnings("unused")
+    private Comparator<String> comparator;
+
+    private PriorityQueue<ReaderBuffer> pq;
+
+    private String[] row;
+
+    public SortedColumnDFSFileReader(Collection<ReadableTable.TableReader> 
readers, final Comparator<String> comparator) {
+        this.readers = readers;
+        this.comparator = comparator;
+        pq = new PriorityQueue<ReaderBuffer>(11, new 
Comparator<ReaderBuffer>() {
+            @Override
+            public int compare(ReaderBuffer i, ReaderBuffer j) {
+                boolean isEmpty1 = i.empty();
+                boolean isEmpty2 = j.empty();
+                if (isEmpty1 && isEmpty2)
+                    return 0;
+                if (isEmpty1 && !isEmpty2)
+                    return 1;
+                if (!isEmpty1 && isEmpty2)
+                    return -1;
+                return comparator.compare(i.peek()[0], j.peek()[0]);
+            }
+        });
+        for (ReadableTable.TableReader reader : readers) {
+            if (reader != null) {
+                try {
+                    pq.add(new ReaderBuffer(reader));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        while (pq.size() > 0) {
+            ReaderBuffer buffer = pq.poll();
+            String[] minEntry = buffer.pop();
+            this.row = minEntry;
+            if (buffer.empty()) {
+                pq.remove(buffer);
+            } else {
+                pq.add(buffer); // add it back
+            }
+            if (this.row == null) { //avoid the case of empty file
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String[] getRow() {
+        return this.row;
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (ReadableTable.TableReader reader : readers)
+            reader.close();
+    }
+
+    static class ReaderBuffer {
+        private ReadableTable.TableReader reader;
+
+        private String[] row;
+
+        public ReaderBuffer(ReadableTable.TableReader reader) throws 
IOException {
+            this.reader = reader;
+            reload();
+        }
+
+        public void close() throws IOException {
+            if (this.reader != null)
+                reader.close();
+        }
+
+        public boolean empty() {
+            return (this.row == null);
+        }
+
+        public String[] peek() {
+            return this.row;
+        }
+
+        public String[] pop() throws IOException {
+            String[] result = this.row;
+            reload();
+            return result;
+        }
+
+        private void reload() throws IOException {
+            if (reader.next()) {
+                row = reader.getRow();
+            } else {
+                this.row = null;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java
deleted file mode 100644
index 215198f..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.source.ReadableTable;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-
-/**
- * Created by xiefan on 16-11-22.
- */
-public class SortedColumnReader implements ReadableTable.TableReader {
-    private Collection<ReadableTable.TableReader> readers;
-
-    private Comparator<String> comparator;
-
-    private PriorityQueue<ReaderBuffer> pq;
-
-    private String[] row;
-
-    public SortedColumnReader(Collection<ReadableTable.TableReader> readers, 
final Comparator<String> comparator) {
-        this.readers = readers;
-        this.comparator = comparator;
-        pq = new PriorityQueue<ReaderBuffer>(11, new 
Comparator<ReaderBuffer>() {
-            @Override
-            public int compare(ReaderBuffer i, ReaderBuffer j) {
-                boolean isEmpty1 = i.empty();
-                boolean isEmpty2 = j.empty();
-                if (isEmpty1 && isEmpty2)
-                    return 0;
-                if (isEmpty1 && !isEmpty2)
-                    return 1;
-                if (!isEmpty1 && isEmpty2)
-                    return -1;
-                return comparator.compare(i.peek()[0], j.peek()[0]);
-            }
-        });
-        for (ReadableTable.TableReader reader : readers) {
-            if (reader != null) {
-                try {
-                    pq.add(new ReaderBuffer(reader));
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        while (pq.size() > 0) {
-            ReaderBuffer buffer = pq.poll();
-            String[] minEntry = buffer.pop();
-            this.row = minEntry;
-            if (buffer.empty()) {
-                pq.remove(buffer);
-            } else {
-                pq.add(buffer); // add it back
-            }
-            if (this.row == null) { //avoid the case of empty file
-                return false;
-            }
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public String[] getRow() {
-        return this.row;
-    }
-
-    @Override
-    public void close() throws IOException {
-        for (ReadableTable.TableReader reader : readers)
-            reader.close();
-    }
-
-    static class ReaderBuffer {
-        public ReaderBuffer(ReadableTable.TableReader reader) throws 
IOException {
-            this.reader = reader;
-            reload();
-        }
-
-        public void close() throws IOException {
-            if (this.reader != null)
-                reader.close();
-        }
-
-        public boolean empty() {
-            return (this.row == null);
-        }
-
-        public String[] peek() {
-            return this.row;
-        }
-
-        public String[] pop() throws IOException {
-            String[] result = this.row;
-            reload();
-            return result;
-        }
-
-        private void reload() throws IOException {
-            if (reader.next()) {
-                row = reader.getRow();
-            } else {
-                this.row = null;
-            }
-        }
-
-        private ReadableTable.TableReader reader;
-
-        private String[] row;
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 7447133..5d7cb21 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -23,14 +23,13 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.SortedColumn;
+import org.apache.kylin.engine.mr.SortedColumnDFSFile;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 
 /**
  * @author ysong1
- * 
  */
 
 public class CreateDictionaryJob extends AbstractHadoopJob {
@@ -54,8 +53,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
         DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new 
DistinctColumnValuesProvider() {
             @Override
             public ReadableTable getDistinctValuesFor(TblColRef col) {
-                //return new DFSFileTable(factColumnsInputPath + "/" + 
col.getName(), -1);
-                return new SortedColumn(factColumnsInputPath + "/" + 
col.getName(), col.getType());
+                return new SortedColumnDFSFile(factColumnsInputPath + "/" + 
col.getName(), col.getType());
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 9722b9c..6e24d61 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -66,7 +66,6 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     private List<ByteArray> colValues;
     private TblColRef col = null;
     private boolean isStatistics = false;
-    private boolean isPartitionCol = false;
     private KylinConfig cubeConfig;
     private int uhcReducerCount;
     private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
@@ -103,13 +102,11 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
             // partition col
             isStatistics = false;
-            isPartitionCol = true;
             col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             colValues = Lists.newLinkedList();
         } else {
             // col
             isStatistics = false;
-            isPartitionCol = false;
             col = columnList.get(ReducerIdToColumnIndex.get(taskId));
             colValues = Lists.newLinkedList();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 7776172..762047b 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -30,13 +30,12 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
@@ -156,14 +155,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
                 outputKey.set(keyBuffer.array(), offset, keyBuffer.position() 
- offset);
                 sortableKey.setText(outputKey);
                 //judge type
-                DataType type = factDictCols.get(i).getType();
-                if (!type.isNumberFamily()) {
-                    sortableKey.setTypeId((byte) 
TypeFlag.NONE_NUMERIC_TYPE.ordinal());
-                } else if (type.isIntegerFamily()) {
-                    sortableKey.setTypeId((byte) 
TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
-                } else {
-                    sortableKey.setTypeId((byte) 
TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
-                }
+                sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
                 context.write(sortableKey, EMPTY_TEXT);
             }
         } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
index 4ca3a90..b804eef 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
@@ -17,9 +17,9 @@
 */
 package org.apache.kylin.engine.mr.steps;
 
-
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,11 +27,15 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-
 /**
  * Created by xiefan on 16-11-1.
  */
 public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSortableKey> {
+    public enum TypeFlag {
+        NONE_NUMERIC_TYPE,
+        INTEGER_FAMILY_TYPE,
+        DOUBLE_FAMILY_TYPE
+    }
 
     private byte typeId; //non-numeric(0000 0000) int(0000 0001) other 
numberic(0000 0010)
 
@@ -61,7 +65,7 @@ public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSorta
                 logger.error("none numeric value!");
                 return 0;
             }
-            if (o.isIntegerFamily()) {  //integer type
+            if (o.isIntegerFamily()) { //integer type
                 try {
                     Long num1 = Long.parseLong(str1);
                     Long num2 = Long.parseLong(str2);
@@ -71,7 +75,7 @@ public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSorta
                     e.printStackTrace();
                     return 0;
                 }
-            } else {  //other numeric type
+            } else { //other numeric type
                 try {
                     Double num1 = Double.parseDouble(str1);
                     Double num2 = Double.parseDouble(str2);
@@ -106,7 +110,8 @@ public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSorta
     }
 
     public boolean isNumberFamily() {
-        if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) return false;
+        if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal())
+            return false;
         return true;
     }
 
@@ -118,6 +123,16 @@ public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSorta
         return (typeId == TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
     }
 
+    public void setTypeIdByDatatype(DataType type) {
+        if (!type.isNumberFamily()) {
+            this.typeId = (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal();
+        } else if (type.isIntegerFamily()) {
+            this.typeId = (byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal();
+        } else {
+            this.typeId = (byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal();
+        }
+    }
+    
     public void setTypeId(byte typeId) {
         this.typeId = typeId;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java
deleted file mode 100644
index 3279106..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-/**
- * Created by xiefan on 16-11-2.
- */
-public enum TypeFlag {
-    NONE_NUMERIC_TYPE,
-    INTEGER_FAMILY_TYPE,
-    DOUBLE_FAMILY_TYPE
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 6e18f9b..eb06f07 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.SortedColumn;
+import org.apache.kylin.engine.mr.SortedColumnDFSFile;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -101,7 +101,7 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
 
         final String factDistinctPath = 
this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
         //final ReadableTable readableTable = new 
DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1);
-        final ReadableTable readableTable = new SortedColumn(factDistinctPath 
+ "/" + partitionCol.getName(), partitionCol.getType());
+        final ReadableTable readableTable = new 
SortedColumnDFSFile(factDistinctPath + "/" + partitionCol.getName(), 
partitionCol.getType());
         final ReadableTable.TableReader tableReader = 
readableTable.getReader();
         long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
index 2f2170c..3c4195f 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
@@ -17,32 +17,28 @@
 */
 package org.apache.kylin.engine.mr;
 
-import org.apache.kylin.dict.ByteComparator;
-import org.apache.kylin.dict.BytesConverter;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.IDictionaryValueEnumerator;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TableColumnValueEnumerator;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.source.ReadableTable;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.omg.Messaging.SYNC_WITH_TRANSPORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Random;
 import java.util.UUID;
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.dict.ByteComparator;
+import org.apache.kylin.dict.BytesConverter;
+import org.apache.kylin.dict.IDictionaryValueEnumerator;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TableColumnValueEnumerator;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Created by xiefan on 16-11-14.
@@ -50,107 +46,106 @@ import static org.junit.Assert.assertEquals;
 public class SortedColumnReaderTest {
 
     @Test
-    public void testReadStringMultiFile() throws Exception{
+    public void testReadStringMultiFile() throws Exception {
         String dirPath = "src/test/resources/multi_file_str";
         StringBytesConverter converter = new StringBytesConverter();
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         Collections.sort(correctAnswer, new ByteComparator<String>(new 
StringBytesConverter()));
-        SortedColumn column = new SortedColumn(dirPath + 
"/",DataType.getType("varchar"));
-        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(),-1);
+        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", 
DataType.getType("varchar"));
+        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
-        while(e.moveNext()){
+        while (e.moveNext()) {
             output.add(new String(e.current()));
         }
         System.out.println(correctAnswer.size());
         assertTrue(correctAnswer.size() == output.size());
-        for(int i=0;i<correctAnswer.size();i++){
-            assertEquals(correctAnswer.get(i),output.get(i));
+        for (int i = 0; i < correctAnswer.size(); i++) {
+            assertEquals(correctAnswer.get(i), output.get(i));
         }
     }
 
-
     @Ignore
     @Test
-    public void createStringTestFiles() throws Exception{
+    public void createStringTestFiles() throws Exception {
         String dirPath = "src/test/resources/multi_file_str";
         String prefix = "src/test/resources/multi_file_str/data_";
         ArrayList<String> data = new ArrayList<>();
         int num = 10000;
-        for(int i=0;i<num;i++){
+        for (int i = 0; i < num; i++) {
             UUID uuid = UUID.randomUUID();
             data.add(uuid.toString());
         }
-        Collections.sort(data,new ByteComparator<String>(new 
StringBytesConverter()));
+        Collections.sort(data, new ByteComparator<String>(new 
StringBytesConverter()));
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<File> allFiles = new ArrayList<>();
         int fileNum = 5;
-        for(int i=0;i<fileNum;i++){
+        for (int i = 0; i < fileNum; i++) {
             File f = new File(prefix + i);
-            if(!f.exists())
+            if (!f.exists())
                 f.createNewFile();
             allFiles.add(f);
         }
         ArrayList<BufferedWriter> bws = new ArrayList<>();
-        for(File f : allFiles){
+        for (File f : allFiles) {
             bws.add(new BufferedWriter(new FileWriter(f)));
         }
         System.out.println(data.size());
-        for(String str : data){
+        for (String str : data) {
             int fileId = rand.nextInt(fileNum);
             BufferedWriter bw = bws.get(fileId);
             bw.write(str);
             bw.newLine();
         }
-        for(BufferedWriter bw : bws)
-        {
+        for (BufferedWriter bw : bws) {
             bw.flush();
             bw.close();
         }
         File dir = new File(dirPath);
         File[] files = dir.listFiles();
-        for(File file : files){
-            System.out.println("file:"+file.getAbsolutePath()+" 
size:"+file.length());
+        for (File file : files) {
+            System.out.println("file:" + file.getAbsolutePath() + " size:" + 
file.length());
         }
     }
 
     @Test
-    public void testReadIntegerMultiFiles() throws Exception{
+    public void testReadIntegerMultiFiles() throws Exception {
         String dirPath = "src/test/resources/multi_file_int";
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         Collections.sort(correctAnswer, new Comparator<String>() {
             @Override
             public int compare(String o1, String o2) {
-                try{
+                try {
                     Long l1 = Long.parseLong(o1);
                     Long l2 = Long.parseLong(o2);
                     return l1.compareTo(l2);
-                }catch (NumberFormatException e){
+                } catch (NumberFormatException e) {
                     e.printStackTrace();
                     return 0;
                 }
             }
         });
-        SortedColumn column = new SortedColumn(dirPath + 
"/",DataType.getType("long"));
-        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(),-1);
+        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", 
DataType.getType("long"));
+        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
-        while(e.moveNext()){
+        while (e.moveNext()) {
             System.out.println(new String(e.current()));
             output.add(new String(e.current()));
         }
         System.out.println(correctAnswer.size());
         assertTrue(correctAnswer.size() == output.size());
-        for(int i=0;i<correctAnswer.size();i++){
-            assertEquals(correctAnswer.get(i),output.get(i));
+        for (int i = 0; i < correctAnswer.size(); i++) {
+            assertEquals(correctAnswer.get(i), output.get(i));
         }
     }
 
     @Test
-    public void testEmptyDir() throws Exception{
+    public void testEmptyDir() throws Exception {
         String dirPath = "src/test/resources/empty_dir";
-        SortedColumn column = new SortedColumn(dirPath + 
"/",DataType.getType("varchar"));
-        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(),-1);
+        new File(dirPath).mkdirs();
+        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", 
DataType.getType("varchar"));
+        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
-        while(e.moveNext()){
+        while (e.moveNext()) {
             System.out.println(new String(e.current()));
             output.add(new String(e.current()));
         }
@@ -158,151 +153,147 @@ public class SortedColumnReaderTest {
     }
 
     @Test
-    public void testEmptyFile() throws Exception{
+    public void testEmptyFile() throws Exception {
         String dirPath = "src/test/resources/multi_file_empty_file";
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         final BytesConverter<String> converter = new StringBytesConverter();
         Collections.sort(correctAnswer, new ByteComparator<String>(new 
StringBytesConverter()));
-        System.out.println("correct answer:"+correctAnswer);
-        SortedColumn column = new SortedColumn(dirPath + 
"/",DataType.getType("varchar"));
-        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(),-1);
+        System.out.println("correct answer:" + correctAnswer);
+        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", 
DataType.getType("varchar"));
+        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
-        while(e.moveNext()){
+        while (e.moveNext()) {
             output.add(new String(e.current()));
         }
         System.out.println(correctAnswer.size());
         assertTrue(correctAnswer.size() == output.size());
-        for(int i=0;i<correctAnswer.size();i++){
-            assertEquals(correctAnswer.get(i),output.get(i));
+        for (int i = 0; i < correctAnswer.size(); i++) {
+            assertEquals(correctAnswer.get(i), output.get(i));
         }
     }
 
-
     @Ignore
     @Test
-    public void createIntegerTestFiles() throws Exception{
+    public void createIntegerTestFiles() throws Exception {
         String dirPath = "src/test/resources/multi_file_int";
         String prefix = "src/test/resources/multi_file_int/data_";
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<String> data = new ArrayList<>();
         int num = 10000;
-        for(int i=0;i<num;i++){
-            data.add(i+"");
+        for (int i = 0; i < num; i++) {
+            data.add(i + "");
         }
         ArrayList<File> allFiles = new ArrayList<>();
         int fileNum = 5;
-        for(int i=0;i<fileNum;i++){
+        for (int i = 0; i < fileNum; i++) {
             File f = new File(prefix + i);
-            if(!f.exists())
+            if (!f.exists())
                 f.createNewFile();
             allFiles.add(f);
         }
         ArrayList<BufferedWriter> bws = new ArrayList<>();
-        for(File f : allFiles){
+        for (File f : allFiles) {
             bws.add(new BufferedWriter(new FileWriter(f)));
         }
         System.out.println(data.size());
-        for(String str : data){
+        for (String str : data) {
             int fileId = rand.nextInt(fileNum);
             BufferedWriter bw = bws.get(fileId);
             bw.write(str);
             bw.newLine();
         }
-        for(BufferedWriter bw : bws)
-        {
+        for (BufferedWriter bw : bws) {
             bw.flush();
             bw.close();
         }
         File dir = new File(dirPath);
         File[] files = dir.listFiles();
-        for(File file : files){
-            System.out.println("file:"+file.getAbsolutePath()+" 
size:"+file.length());
+        for (File file : files) {
+            System.out.println("file:" + file.getAbsolutePath() + " size:" + 
file.length());
         }
     }
 
     @Test
-    public void testReadDoubleMultiFiles() throws Exception{
+    public void testReadDoubleMultiFiles() throws Exception {
         String dirPath = "src/test/resources/multi_file_double";
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         Collections.sort(correctAnswer, new Comparator<String>() {
             @Override
             public int compare(String o1, String o2) {
-                try{
+                try {
                     Double d1 = Double.parseDouble(o1);
                     Double d2 = Double.parseDouble(o2);
                     return d1.compareTo(d2);
-                }catch (NumberFormatException e){
+                } catch (NumberFormatException e) {
                     e.printStackTrace();
                     return 0;
                 }
             }
         });
-        SortedColumn column = new SortedColumn(dirPath + 
"/",DataType.getType("double"));
-        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(),-1);
+        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", 
DataType.getType("double"));
+        IDictionaryValueEnumerator e = new 
TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
-        while(e.moveNext()){
+        while (e.moveNext()) {
             System.out.println(new String(e.current()));
             output.add(new String(e.current()));
         }
         System.out.println(correctAnswer.size());
         assertTrue(correctAnswer.size() == output.size());
-        for(int i=0;i<correctAnswer.size();i++){
-            assertEquals(correctAnswer.get(i),output.get(i));
+        for (int i = 0; i < correctAnswer.size(); i++) {
+            assertEquals(correctAnswer.get(i), output.get(i));
         }
     }
 
-
     @Ignore
     @Test
-    public void createDoubleTestFiles() throws Exception{
+    public void createDoubleTestFiles() throws Exception {
         String dirPath = "src/test/resources/multi_file_double";
         String prefix = "src/test/resources/multi_file_double/data_";
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<String> data = new ArrayList<>();
         int num = 10000;
         double k = 0.0;
-        for(int i=0;i<num;i++){
-            data.add(k+"");
-            k+=0.52;
+        for (int i = 0; i < num; i++) {
+            data.add(k + "");
+            k += 0.52;
         }
         ArrayList<File> allFiles = new ArrayList<>();
         int fileNum = 5;
-        for(int i=0;i<fileNum;i++){
+        for (int i = 0; i < fileNum; i++) {
             File f = new File(prefix + i);
-            if(!f.exists())
+            if (!f.exists())
                 f.createNewFile();
             allFiles.add(f);
         }
         ArrayList<BufferedWriter> bws = new ArrayList<>();
-        for(File f : allFiles){
+        for (File f : allFiles) {
             bws.add(new BufferedWriter(new FileWriter(f)));
         }
         System.out.println(data.size());
-        for(String str : data){
+        for (String str : data) {
             int fileId = rand.nextInt(fileNum);
             BufferedWriter bw = bws.get(fileId);
             bw.write(str);
             bw.newLine();
         }
-        for(BufferedWriter bw : bws)
-        {
+        for (BufferedWriter bw : bws) {
             bw.flush();
             bw.close();
         }
         File dir = new File(dirPath);
         File[] files = dir.listFiles();
-        for(File file : files){
-            System.out.println("file:"+file.getAbsolutePath()+" 
size:"+file.length());
+        for (File file : files) {
+            System.out.println("file:" + file.getAbsolutePath() + " size:" + 
file.length());
         }
     }
 
-    private ArrayList<String> readAllFiles(String dirPath) throws Exception{
+    private ArrayList<String> readAllFiles(String dirPath) throws Exception {
         ArrayList<String> result = new ArrayList<>();
         File dir = new File(dirPath);
-        for(File f : dir.listFiles()){
+        for (File f : dir.listFiles()) {
             BufferedReader br = new BufferedReader(new FileReader(f));
             String str = br.readLine();
-            while(str != null){
+            while (str != null) {
                 result.add(str);
                 str = br.readLine();
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
index 677e386..12105c9 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
@@ -23,6 +23,7 @@ import org.apache.kylin.dict.NumberDictionaryBuilder;
 import org.apache.kylin.dict.NumberDictionaryForestBuilder;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionaryForest;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey.TypeFlag;
 import org.junit.Ignore;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
index b03514c..283a0f5 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
@@ -12,6 +12,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey.TypeFlag;
 import org.junit.Test;
 
 /**
@@ -20,11 +21,11 @@ import org.junit.Test;
 public class SelfDefineSortableKeyTest {
 
     @Test
-    public void testSortLong(){
+    public void testSortLong() {
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<Long> longList = new ArrayList<>();
         int count = 10;
-        for(int i=0;i<count;i++){
+        for (int i = 0; i < count; i++) {
             longList.add(rand.nextLong());
         }
         longList.add(0L);
@@ -33,14 +34,14 @@ public class SelfDefineSortableKeyTest {
         longList.add(Long.MAX_VALUE);
         longList.add(Long.MIN_VALUE);
 
-        System.out.println("test numbers:"+longList);
+        System.out.println("test numbers:" + longList);
         ArrayList<String> strNumList = listToStringList(longList);
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
         System.out.println(keyList.get(0).isIntegerFamily());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
-        for(SelfDefineSortableKey key : keyList){
+        for (SelfDefineSortableKey key : keyList) {
             String str = printKey(key);
             strListAftereSort.add(str);
         }
@@ -55,11 +56,11 @@ public class SelfDefineSortableKeyTest {
     }
 
     @Test
-    public void testSortDouble(){
+    public void testSortDouble() {
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<Double> doubleList = new ArrayList<>();
         int count = 10;
-        for(int i=0;i<count;i++){
+        for (int i = 0; i < count; i++) {
             doubleList.add(rand.nextDouble());
         }
         doubleList.add(0.0);
@@ -69,14 +70,14 @@ public class SelfDefineSortableKeyTest {
         doubleList.add(-Double.MAX_VALUE);
         //System.out.println(Double.MIN_VALUE);
 
-        System.out.println("test numbers:"+doubleList);
+        System.out.println("test numbers:" + doubleList);
         ArrayList<String> strNumList = listToStringList(doubleList);
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
         System.out.println(keyList.get(0).isOtherNumericFamily());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
-        for(SelfDefineSortableKey key : keyList){
+        for (SelfDefineSortableKey key : keyList) {
             String str = printKey(key);
             strListAftereSort.add(str);
         }
@@ -91,10 +92,10 @@ public class SelfDefineSortableKeyTest {
     }
 
     @Test
-    public void testSortNormalString(){
+    public void testSortNormalString() {
         int count = 10;
         ArrayList<String> strList = new ArrayList<>();
-        for(int i=0;i<count;i++){
+        for (int i = 0; i < count; i++) {
             UUID uuid = UUID.randomUUID();
             strList.add(uuid.toString());
         }
@@ -102,28 +103,28 @@ public class SelfDefineSortableKeyTest {
         strList.add("hello"); //duplicate
         strList.add("123");
         strList.add("");
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, 
(byte)TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, 
(byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal());
         System.out.println(keyList.get(0).isOtherNumericFamily());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
-        for(SelfDefineSortableKey key : keyList){
+        for (SelfDefineSortableKey key : keyList) {
             String str = printKey(key);
             strListAftereSort.add(str);
         }
         assertTrue(isIncreasedOrder(strListAftereSort, new 
Comparator<String>() {
             @Override
             public int compare(String o1, String o2) {
-               return o1.compareTo(o2);
+                return o1.compareTo(o2);
             }
         }));
     }
 
     @Test
-    public void testIllegalNumber(){
+    public void testIllegalNumber() {
         Random rand = new Random(System.currentTimeMillis());
         ArrayList<Double> doubleList = new ArrayList<>();
         int count = 10;
-        for(int i=0;i<count;i++){
+        for (int i = 0; i < count; i++) {
             doubleList.add(rand.nextDouble());
         }
         doubleList.add(0.0);
@@ -133,84 +134,74 @@ public class SelfDefineSortableKeyTest {
         doubleList.add(-Double.MAX_VALUE);
         //System.out.println(Double.MIN_VALUE);
 
-        System.out.println("test numbers:"+doubleList);
+        System.out.println("test numbers:" + doubleList);
         ArrayList<String> strNumList = listToStringList(doubleList);
         strNumList.add("fjaeif"); //illegal type
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
         System.out.println(keyList.get(0).isOtherNumericFamily());
         Collections.sort(keyList);
-        for(SelfDefineSortableKey key : keyList){
+        for (SelfDefineSortableKey key : keyList) {
             printKey(key);
         }
 
     }
 
     @Test
-    public void testEnum(){
+    public void testEnum() {
         TypeFlag flag = TypeFlag.DOUBLE_FAMILY_TYPE;
-        System.out.println((byte)flag.ordinal());
-        int t = (byte)flag.ordinal();
+        System.out.println((byte) flag.ordinal());
+        int t = (byte) flag.ordinal();
         System.out.println(t);
     }
 
-
-
-    private<T> ArrayList<String> listToStringList(ArrayList<T> list){
+    private <T> ArrayList<String> listToStringList(ArrayList<T> list) {
         ArrayList<String> strList = new ArrayList<>();
-        for(T t : list){
+        for (T t : list) {
             System.out.println(t.toString());
             strList.add(t.toString());
         }
         return strList;
     }
 
-    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> 
strNumList,byte typeFlag){
+    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> 
strNumList, byte typeFlag) {
         int partationId = 0;
         ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>();
-        for(String str : strNumList){
+        for (String str : strNumList) {
             ByteBuffer keyBuffer = ByteBuffer.allocate(4096);
             int offset = keyBuffer.position();
             keyBuffer.put(Bytes.toBytes(partationId)[3]);
             keyBuffer.put(Bytes.toBytes(str));
-            //System.out.println(Arrays.toString(keyBuffer.array()));
-            byte[] valueField = 
Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1);
-            //System.out.println("new string:"+new String(valueField));
-            //System.out.println("arrays 
toString:"+Arrays.toString(valueField));
+            Bytes.copy(keyBuffer.array(), 1, keyBuffer.position() - offset - 
1);
             Text outputKey = new Text();
-            
outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset);
-            SelfDefineSortableKey sortableKey = new 
SelfDefineSortableKey(typeFlag,outputKey);
+            outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - 
offset);
+            SelfDefineSortableKey sortableKey = new 
SelfDefineSortableKey(typeFlag, outputKey);
             keyList.add(sortableKey);
         }
         return keyList;
     }
 
-    private String printKey(SelfDefineSortableKey key){
-        byte[] data = key.getText().getBytes();
-        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
-        System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new 
String(fieldValue));
-        return new String(fieldValue);
-    }
-
-    private String getFieldValue(SelfDefineSortableKey key){
+    private String printKey(SelfDefineSortableKey key) {
         byte[] data = key.getText().getBytes();
-        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        byte[] fieldValue = Bytes.copy(data, 1, data.length - 1);
+        System.out.println("type flag:" + key.getTypeId() + " fieldValue:" + 
new String(fieldValue));
         return new String(fieldValue);
     }
 
-    private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){
+    private <T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp) {
         int flag;
         T previous = null;
-        for(T t : list){
-            if(previous == null) previous = t;
-            else{
-                flag = comp.compare(previous,t);
-                if(flag > 0) return false;
+        for (T t : list) {
+            if (previous == null)
+                previous = t;
+            else {
+                flag = comp.compare(previous, t);
+                if (flag > 0)
+                    return false;
                 previous = t;
             }
         }
         return true;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/827205f1/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 3f9ce01..8309a8c 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -82,9 +82,6 @@ public class HiveTableReader implements TableReader {
         }
 
         this.numberOfSplits = readCntxt.numSplits();
-
-        //        HCatTableInfo tableInfo = HCatTableInfo.
-        //        HCatSchema schema = 
HCatBaseInputFormat.getTableSchema(context.getConfiguration);
     }
 
     @Override

Reply via email to