http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
index 869146b..f018f28 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.engine.mr.steps;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+package org.apache.kylin.engine.mr.steps;
 
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -33,10 +32,11 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
 
 @Ignore
 public class NewCubeSamplingMethodTest {
@@ -64,6 +64,7 @@ public class NewCubeSamplingMethodTest {
         compareAccuracyBasic(dataSet);
     }
 
+
     @Ignore
     @Test
     public void testSmallCardData() throws Exception {
@@ -72,6 +73,7 @@ public class NewCubeSamplingMethodTest {
         compareAccuracyBasic(dataSet);
     }
 
+
     public void comparePerformanceBasic(final List<List<String>> rows) throws 
Exception {
         //old hash method
         ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
@@ -120,11 +122,11 @@ public class NewCubeSamplingMethodTest {
                     counter.add(hc.hash().asBytes());
                 }
                 long estimate = counter.getCountEstimate();
-                System.out.println("old method finished. Estimate cardinality 
: " + estimate + ". Error rate : "
-                        + countErrorRate(estimate, realCardinality));
+                System.out.println("old method finished. Estimate cardinality 
: " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
             }
         });
 
+
         long t2 = runAndGetTime(new TestCase() {
             @Override
             public void run() throws Exception {
@@ -147,8 +149,7 @@ public class NewCubeSamplingMethodTest {
                     counter.addHashDirectly(value);
                 }
                 long estimate = counter.getCountEstimate();
-                System.out.println("new method finished. Estimate cardinality 
: " + estimate + ". Error rate : "
-                        + countErrorRate(estimate, realCardinality));
+                System.out.println("new method finished. Estimate cardinality 
: " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
             }
         });
     }
@@ -178,6 +179,7 @@ public class NewCubeSamplingMethodTest {
         return counters;
     }
 
+
     private void addCuboidBitSet(long cuboidId, List<Integer[]> 
allCuboidsBitSet) {
         Integer[] indice = new Integer[Long.bitCount(cuboidId)];
 
@@ -206,8 +208,7 @@ public class NewCubeSamplingMethodTest {
         void run() throws Exception;
     }
 
-    private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, 
HLLCounter[] cuboidCounters,
-            HashFunction hashFunction) {
+    private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, 
HLLCounter[] cuboidCounters, HashFunction hashFunction) {
         int x = 0;
         for (String field : row) {
             Hasher hc = hashFunction.newHasher();
@@ -224,8 +225,7 @@ public class NewCubeSamplingMethodTest {
         }
     }
 
-    private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, 
HLLCounter[] cuboidCounters,
-            HashFunction hashFunction) {
+    private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, 
HLLCounter[] cuboidCounters, HashFunction hashFunction) {
         int x = 0;
         for (String field : row) {
             Hasher hc = hashFunction.newHasher();
@@ -266,7 +266,7 @@ public class NewCubeSamplingMethodTest {
         return row;
     }
 
-    private String[] smallCardRow = { "abc", "bcd", "jifea", "feaifj" };
+    private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
 
     private Random rand = new Random(System.currentTimeMillis());
 
@@ -279,6 +279,7 @@ public class NewCubeSamplingMethodTest {
         return row;
     }
 
+
     private int countCardinality(List<List<String>> rows) {
         Set<String> diffCols = new HashSet<String>();
         for (List<String> row : rows) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
index ab55bcf..414ab95 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
@@ -67,7 +67,7 @@ public class NumberDictionaryForestTest {
         //stimulate map-reduce job
         ArrayList<SelfDefineSortableKey> keyList = createKeyList(humanList, 
(byte) flag.ordinal());
         Collections.sort(keyList);
-
+        
         //build tree
         NumberDictionaryForestBuilder b = new NumberDictionaryForestBuilder(0, 
0);
         expectedList = numberSort(expectedList);
@@ -76,7 +76,7 @@ public class NumberDictionaryForestTest {
         }
         TrieDictionaryForest<String> dict = b.build();
         dict.dump(System.out);
-
+        
         ArrayList<Integer> resultIds = new ArrayList<>();
         for (int i = 0; i < keyList.size(); i++) {
             SelfDefineSortableKey key = keyList.get(i);
@@ -84,7 +84,7 @@ public class NumberDictionaryForestTest {
             resultIds.add(dict.getIdFromValue(fieldValue));
             assertEquals(expectedList.get(i), 
dict.getValueFromId(dict.getIdFromValue(fieldValue)));
         }
-
+        
         assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() {
             @Override
             public int compare(Integer o1, Integer o2) {
@@ -101,8 +101,7 @@ public class NumberDictionaryForestTest {
                 double d1 = Double.parseDouble(o1);
                 double d2 = Double.parseDouble(o2);
                 return Double.compare(d1, d2);
-            }
-        });
+            }});
         return result;
     }
 
@@ -291,18 +290,16 @@ public class NumberDictionaryForestTest {
         int flag;
         T previous = null;
         for (T t : list) {
-            if (previous == null)
-                previous = t;
+            if (previous == null) previous = t;
             else {
                 flag = comp.compare(previous, t);
-                if (flag > 0)
-                    return false;
+                if (flag > 0) return false;
                 previous = t;
             }
         }
         return true;
     }
-
+    
     @Test
     public void testNormalizeNumber() {
         assertEquals("0", Number2BytesConverter.normalizeNumber("+0000.000"));
@@ -314,7 +311,7 @@ public class NumberDictionaryForestTest {
         assertEquals("200", Number2BytesConverter.normalizeNumber("200"));
         assertEquals("200", Number2BytesConverter.normalizeNumber("200.00"));
         assertEquals("200.01", 
Number2BytesConverter.normalizeNumber("200.010"));
-
+        
         for (int i = -100; i < 101; i++) {
             String expected = "" + i;
             int cut = expected.startsWith("-") ? 1 : 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
index 231387b..551998f 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
@@ -56,8 +56,7 @@ public class SelfDefineSortableKeyTest {
         System.out.println("test numbers:" + longList);
         ArrayList<String> strNumList = listToStringList(longList);
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
-                (byte) 
SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
         System.out.println(keyList.get(0).isIntegerFamily());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
@@ -93,8 +92,7 @@ public class SelfDefineSortableKeyTest {
         System.out.println("test numbers:" + doubleList);
         ArrayList<String> strNumList = listToStringList(doubleList);
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
-                (byte) 
SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
         for (SelfDefineSortableKey key : keyList) {
@@ -123,8 +121,7 @@ public class SelfDefineSortableKeyTest {
         strList.add("hello"); //duplicate
         strList.add("123");
         strList.add("");
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList,
-                (byte) 
SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, 
(byte) SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal());
         Collections.sort(keyList);
         ArrayList<String> strListAftereSort = new ArrayList<>();
         for (SelfDefineSortableKey key : keyList) {
@@ -154,16 +151,17 @@ public class SelfDefineSortableKeyTest {
         doubleList.add(-Double.MAX_VALUE);
         //System.out.println(Double.MIN_VALUE);
 
+
         ArrayList<String> strNumList = listToStringList(doubleList);
         //System.out.println("test num strs list:"+strNumList);
-        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
-                (byte) 
SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
 
         System.out.println("start to test str sort");
         long start = System.currentTimeMillis();
         Collections.sort(strNumList);
         System.out.println("sort time : " + (System.currentTimeMillis() - 
start));
 
+
         System.out.println("start to test double sort");
         start = System.currentTimeMillis();
         Collections.sort(keyList);
@@ -191,6 +189,7 @@ public class SelfDefineSortableKeyTest {
         System.out.println("sort time : " + (System.currentTimeMillis() - 
start));
     }
 
+
     @Test
     public void testIllegalNumber() {
         Random rand = new Random(System.currentTimeMillis());
@@ -211,11 +210,10 @@ public class SelfDefineSortableKeyTest {
         strNumList.add("fjaeif"); //illegal type
         //System.out.println("test num strs list:"+strNumList);
         try {
-            ArrayList<SelfDefineSortableKey> keyList = 
createKeyList(strNumList,
-                    (byte) 
SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+            ArrayList<SelfDefineSortableKey> keyList = 
createKeyList(strNumList, (byte) 
SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
             Collections.sort(keyList);
             fail("Need catch exception");
-        } catch (Exception e) {
+        }catch(Exception e){
             //correct
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index cf6d0a8..66b154d 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,8 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
-        
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
-                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
+        
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), 
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), 
KylinConfig.getKylinConfPath());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), 
cuboidRootPath);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
index ef39c69..6478c10 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
@@ -37,8 +37,7 @@ import scala.Tuple2;
  */
 public class SparkCountDemo extends AbstractApplication {
 
-    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true)
-            .withDescription("Input path").create("input");
+    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input
 path").create("input");
 
     private Options options;
 
@@ -57,29 +56,25 @@ public class SparkCountDemo extends AbstractApplication {
         String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // 
Should be some file on your system
         SparkConf conf = new SparkConf().setAppName("Simple Application");
         JavaSparkContext sc = new JavaSparkContext(conf);
-        final JavaPairRDD<String, Integer> logData = sc.textFile(logFile)
-                .mapToPair(new PairFunction<String, String, Integer>() {
+        final JavaPairRDD<String, Integer> logData = 
sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
 
-                    @Override
-                    public Tuple2<String, Integer> call(String s) throws 
Exception {
-                        return new Tuple2<String, Integer>(s, s.length());
-                    }
-                }).sortByKey();
+            @Override
+            public Tuple2<String, Integer> call(String s) throws Exception {
+                return new Tuple2<String, Integer>(s, s.length());
+            }
+        }).sortByKey();
         logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
 
         System.out.println("line number:" + logData.count());
 
         logData.mapToPair(new PairFunction<Tuple2<String, Integer>, 
ImmutableBytesWritable, KeyValue>() {
             @Override
-            public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<String, Integer> stringIntegerTuple2)
-                    throws Exception {
+            public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                 ImmutableBytesWritable key = new 
ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
-                KeyValue value = new 
KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(),
-                        String.valueOf(stringIntegerTuple2._2()).getBytes());
+                KeyValue value = new 
KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), 
String.valueOf(stringIntegerTuple2._2()).getBytes());
                 return new Tuple2(key, value);
             }
-        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", 
ImmutableBytesWritable.class, KeyValue.class,
-                HFileOutputFormat.class);
+        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", 
ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index c3326ff..2a0981a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -120,16 +120,11 @@ public class SparkCubing extends AbstractApplication {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(SparkCubing.class);
 
-    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true)
-            .withDescription("Hive Intermediate Table").create("hiveTable");
-    private static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
-            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
-    private static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
-    private static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
-            .withDescription("Configuration Path").create("confPath");
-    private static final Option OPTION_COPROCESSOR = 
OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true)
-            .withDescription("Coprocessor Jar Path").create("coprocessor");
+    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive
 Intermediate Table").create("hiveTable");
+    private static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 Name").create(BatchConstants.ARG_CUBE_NAME);
+    private static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube
 Segment Id").create("segmentId");
+    private static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration
 Path").create("confPath");
+    private static final Option OPTION_COPROCESSOR = 
OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor
 Jar Path").create("coprocessor");
 
     private Options options;
 
@@ -192,10 +187,8 @@ public class SparkCubing extends AbstractApplication {
         final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        final CubeJoinedFlatTableEnrich flatDesc = new 
CubeJoinedFlatTableEnrich(
-                EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
-        final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, 
Cuboid.getBaseCuboidId(cubeDesc))
-                .getColumns();
+        final CubeJoinedFlatTableEnrich flatDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
+        final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, 
Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
         final long start = System.currentTimeMillis();
         final RowKeyDesc rowKey = cubeDesc.getRowkey();
         for (int i = 0; i < baseCuboidColumn.size(); i++) {
@@ -214,36 +207,35 @@ public class SparkCubing extends AbstractApplication {
             final DataFrame frame = 
intermediateTable.select(column).distinct();
 
             final Row[] rows = frame.collect();
-            dictionaryMap.put(tblColRef, 
DictionaryGenerator.buildDictionary(tblColRef.getType(),
-                    new IterableDictionaryValueEnumerator(new 
Iterable<String>() {
-                        @Override
-                        public Iterator<String> iterator() {
-                            return new Iterator<String>() {
-                                int i = 0;
+            dictionaryMap.put(tblColRef, 
DictionaryGenerator.buildDictionary(tblColRef.getType(), new 
IterableDictionaryValueEnumerator(new Iterable<String>() {
+                @Override
+                public Iterator<String> iterator() {
+                    return new Iterator<String>() {
+                        int i = 0;
 
-                                @Override
-                                public boolean hasNext() {
-                                    return i < rows.length;
-                                }
+                        @Override
+                        public boolean hasNext() {
+                            return i < rows.length;
+                        }
 
-                                @Override
-                                public String next() {
-                                    if (hasNext()) {
-                                        final Row row = rows[i++];
-                                        final Object o = row.get(0);
-                                        return o != null ? o.toString() : null;
-                                    } else {
-                                        throw new NoSuchElementException();
-                                    }
-                                }
+                        @Override
+                        public String next() {
+                            if (hasNext()) {
+                                final Row row = rows[i++];
+                                final Object o = row.get(0);
+                                return o != null ? o.toString() : null;
+                            } else {
+                                throw new NoSuchElementException();
+                            }
+                        }
 
-                                @Override
-                                public void remove() {
-                                    throw new UnsupportedOperationException();
-                                }
-                            };
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
                         }
-                    })));
+                    };
+                }
+            })));
         }
         final long end = System.currentTimeMillis();
         CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
@@ -256,8 +248,7 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
-    private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> 
rowJavaRDD, final String cubeName,
-            String segmentId) throws Exception {
+    private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> 
rowJavaRDD, final String cubeName, String segmentId) throws Exception {
         CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -268,8 +259,7 @@ public class SparkCubing extends AbstractApplication {
             zeroValue.put(id, new 
HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
         }
 
-        CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(
-                EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+        CubeJoinedFlatTableEnrich flatDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
         final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
         final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
@@ -295,56 +285,52 @@ public class SparkCubing extends AbstractApplication {
             row_hashcodes[i] = new ByteArray();
         }
 
-        final HashMap<Long, HLLCounter> samplingResult = 
rowJavaRDD.aggregate(zeroValue,
-                new Function2<HashMap<Long, HLLCounter>, List<String>, 
HashMap<Long, HLLCounter>>() {
+        final HashMap<Long, HLLCounter> samplingResult = 
rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, 
List<String>, HashMap<Long, HLLCounter>>() {
 
-                    final HashFunction hashFunction = Hashing.murmur3_128();
+            final HashFunction hashFunction = Hashing.murmur3_128();
 
-                    @Override
-                    public HashMap<Long, HLLCounter> call(HashMap<Long, 
HLLCounter> v1, List<String> v2)
-                            throws Exception {
-                        for (int i = 0; i < nRowKey; i++) {
-                            Hasher hc = hashFunction.newHasher();
-                            String colValue = v2.get(rowKeyColumnIndexes[i]);
-                            if (colValue != null) {
-                                
row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
-                            } else {
-                                
row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-                            }
-                        }
-
-                        for (Map.Entry<Long, Integer[]> entry : 
allCuboidsBitSet.entrySet()) {
-                            Hasher hc = hashFunction.newHasher();
-                            HLLCounter counter = v1.get(entry.getKey());
-                            final Integer[] cuboidBitSet = entry.getValue();
-                            for (int position = 0; position < 
cuboidBitSet.length; position++) {
-                                
hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
-                            }
-                            counter.add(hc.hash().asBytes());
-                        }
-                        return v1;
+            @Override
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> 
v1, List<String> v2) throws Exception {
+                for (int i = 0; i < nRowKey; i++) {
+                    Hasher hc = hashFunction.newHasher();
+                    String colValue = v2.get(rowKeyColumnIndexes[i]);
+                    if (colValue != null) {
+                        
row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+                    } else {
+                        row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
                     }
-                }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, 
HLLCounter>, HashMap<Long, HLLCounter>>() {
-                    @Override
-                    public HashMap<Long, HLLCounter> call(HashMap<Long, 
HLLCounter> v1, HashMap<Long, HLLCounter> v2)
-                            throws Exception {
-                        Preconditions.checkArgument(v1.size() == v2.size());
-                        Preconditions.checkArgument(v1.size() > 0);
-                        for (Map.Entry<Long, HLLCounter> entry : 
v1.entrySet()) {
-                            final HLLCounter counter1 = entry.getValue();
-                            final HLLCounter counter2 = v2.get(entry.getKey());
-                            
counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
-                        }
-                        return v1;
+                }
+
+                for (Map.Entry<Long, Integer[]> entry : 
allCuboidsBitSet.entrySet()) {
+                    Hasher hc = hashFunction.newHasher();
+                    HLLCounter counter = v1.get(entry.getKey());
+                    final Integer[] cuboidBitSet = entry.getValue();
+                    for (int position = 0; position < cuboidBitSet.length; 
position++) {
+                        
hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
                     }
+                    counter.add(hc.hash().asBytes());
+                }
+                return v1;
+            }
+        }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, 
HashMap<Long, HLLCounter>>() {
+            @Override
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> 
v1, HashMap<Long, HLLCounter> v2) throws Exception {
+                Preconditions.checkArgument(v1.size() == v2.size());
+                Preconditions.checkArgument(v1.size() > 0);
+                for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+                    final HLLCounter counter1 = entry.getValue();
+                    final HLLCounter counter2 = v2.get(entry.getKey());
+                    counter1.merge(Preconditions.checkNotNull(counter2, 
"counter cannot be null"));
+                }
+                return v1;
+            }
 
-                });
+        });
         return samplingResult;
     }
 
     /** return hfile location */
-    private String build(JavaRDD<List<String>> javaRDD, final String cubeName, 
final String segmentId,
-            final byte[][] splitKeys) throws Exception {
+    private String build(JavaRDD<List<String>> javaRDD, final String cubeName, 
final String segmentId, final byte[][] splitKeys) throws Exception {
         CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -378,41 +364,35 @@ public class SparkCubing extends AbstractApplication {
             }
         }
 
-        final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom()
-                .mapPartitionsToPair(new 
PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
+        final JavaPairRDD<byte[], byte[]> javaPairRDD = 
javaRDD.glom().mapPartitionsToPair(new 
PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
 
-                    @Override
-                    public Iterable<Tuple2<byte[], byte[]>> 
call(Iterator<List<List<String>>> listIterator)
-                            throws Exception {
-                        long t = System.currentTimeMillis();
-                        prepare();
-
-                        final CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
-                                .getCube(cubeName);
-
-                        LinkedBlockingQueue<List<String>> blockingQueue = new 
LinkedBlockingQueue();
-                        System.out.println("load properties finished");
-                        IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cubeSegment);
-                        AbstractInMemCubeBuilder inMemCubeBuilder = new 
DoggedCubeBuilder(cubeInstance.getDescriptor(),
-                                flatDesc, dictionaryMap);
-                        final SparkCuboidWriter sparkCuboidWriter = new 
BufferedCuboidWriter(
-                                new 
DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
-                        Executors.newCachedThreadPool()
-                                
.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
-                        try {
-                            while (listIterator.hasNext()) {
-                                for (List<String> row : listIterator.next()) {
-                                    blockingQueue.put(row);
-                                }
-                            }
-                            blockingQueue.put(Collections.<String> 
emptyList());
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
+            @Override
+            public Iterable<Tuple2<byte[], byte[]>> 
call(Iterator<List<List<String>>> listIterator) throws Exception {
+                long t = System.currentTimeMillis();
+                prepare();
+
+                final CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+
+                LinkedBlockingQueue<List<String>> blockingQueue = new 
LinkedBlockingQueue();
+                System.out.println("load properties finished");
+                IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+                AbstractInMemCubeBuilder inMemCubeBuilder = new 
DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
+                final SparkCuboidWriter sparkCuboidWriter = new 
BufferedCuboidWriter(new 
DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
+                
Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue,
 sparkCuboidWriter));
+                try {
+                    while (listIterator.hasNext()) {
+                        for (List<String> row : listIterator.next()) {
+                            blockingQueue.put(row);
                         }
-                        System.out.println("build partition cost: " + 
(System.currentTimeMillis() - t) + "ms");
-                        return sparkCuboidWriter.getResult();
                     }
-                });
+                    blockingQueue.put(Collections.<String> emptyList());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                System.out.println("build partition cost: " + 
(System.currentTimeMillis() - t) + "ms");
+                return sparkCuboidWriter.getResult();
+            }
+        });
 
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration conf = 
getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
@@ -431,9 +411,7 @@ public class SparkCubing extends AbstractApplication {
         return url;
     }
 
-    private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, 
final String[] dataTypes,
-            final int measureSize, final MeasureAggregators aggs, final 
byte[][] splitKeys, final Configuration conf,
-            final String hFileLocation) {
+    private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, 
final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, 
final byte[][] splitKeys, final Configuration conf, final String hFileLocation) 
{
         javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
             @Override
             public int numPartitions() {
@@ -450,52 +428,46 @@ public class SparkCubing extends AbstractApplication {
                 }
                 return splitKeys.length;
             }
-        }, UnsignedBytes.lexicographicalComparator())
-                .mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], 
byte[]>>, Tuple2<byte[], byte[]>>() {
-                    @Override
-                    public Iterable<Tuple2<byte[], byte[]>> call(final 
Iterator<Tuple2<byte[], byte[]>> tuple2Iterator)
-                            throws Exception {
-                        return new Iterable<Tuple2<byte[], byte[]>>() {
-                            final BufferedMeasureCodec codec = new 
BufferedMeasureCodec(dataTypes);
-                            final Object[] input = new Object[measureSize];
-                            final Object[] result = new Object[measureSize];
+        }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new 
FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
+            @Override
+            public Iterable<Tuple2<byte[], byte[]>> call(final 
Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+                return new Iterable<Tuple2<byte[], byte[]>>() {
+                    final BufferedMeasureCodec codec = new 
BufferedMeasureCodec(dataTypes);
+                    final Object[] input = new Object[measureSize];
+                    final Object[] result = new Object[measureSize];
 
+                    @Override
+                    public Iterator<Tuple2<byte[], byte[]>> iterator() {
+                        return IteratorUtils.merge(tuple2Iterator, 
UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, 
byte[]>() {
                             @Override
-                            public Iterator<Tuple2<byte[], byte[]>> iterator() 
{
-                                return IteratorUtils.merge(tuple2Iterator, 
UnsignedBytes.lexicographicalComparator(),
-                                        new Function<Iterable<byte[]>, 
byte[]>() {
-                                            @Override
-                                            public byte[] 
call(Iterable<byte[]> v1) throws Exception {
-                                                final LinkedList<byte[]> list 
= Lists.newLinkedList(v1);
-                                                if (list.size() == 1) {
-                                                    return list.get(0);
-                                                }
-                                                aggs.reset();
-                                                for (byte[] v : list) {
-                                                    
codec.decode(ByteBuffer.wrap(v), input);
-                                                    aggs.aggregate(input);
-                                                }
-                                                aggs.collectStates(result);
-                                                ByteBuffer buffer = 
codec.encode(result);
-                                                byte[] bytes = new 
byte[buffer.position()];
-                                                
System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0,
-                                                        buffer.position());
-                                                return bytes;
-                                            }
-                                        });
+                            public byte[] call(Iterable<byte[]> v1) throws 
Exception {
+                                final LinkedList<byte[]> list = 
Lists.newLinkedList(v1);
+                                if (list.size() == 1) {
+                                    return list.get(0);
+                                }
+                                aggs.reset();
+                                for (byte[] v : list) {
+                                    codec.decode(ByteBuffer.wrap(v), input);
+                                    aggs.aggregate(input);
+                                }
+                                aggs.collectStates(result);
+                                ByteBuffer buffer = codec.encode(result);
+                                byte[] bytes = new byte[buffer.position()];
+                                System.arraycopy(buffer.array(), 
buffer.arrayOffset(), bytes, 0, buffer.position());
+                                return bytes;
                             }
-                        };
-                    }
-                }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, 
ImmutableBytesWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<byte[], byte[]> tuple2)
-                            throws Exception {
-                        ImmutableBytesWritable key = new 
ImmutableBytesWritable(tuple2._1());
-                        KeyValue value = new KeyValue(tuple2._1(), 
"F1".getBytes(), "M".getBytes(), tuple2._2());
-                        return new Tuple2(key, value);
+                        });
                     }
-                }).saveAsNewAPIHadoopFile(hFileLocation, 
ImmutableBytesWritable.class, KeyValue.class,
-                        HFileOutputFormat.class, conf);
+                };
+            }
+        }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, 
ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<byte[], byte[]> tuple2) throws Exception {
+                ImmutableBytesWritable key = new 
ImmutableBytesWritable(tuple2._1());
+                KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), 
"M".getBytes(), tuple2._2());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, 
KeyValue.class, HFileOutputFormat.class, conf);
     }
 
     public static void prepare() throws Exception {
@@ -506,16 +478,14 @@ public class SparkCubing extends AbstractApplication {
         ClassUtil.addClasspath(confPath);
     }
 
-    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, 
HLLCounter> samplingResult)
-            throws Exception {
+    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, 
HLLCounter> samplingResult) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         final Map<Long, Long> rowCountMap = 
CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
         final Map<Long, Double> cubeSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
         System.out.println("cube size estimation:" + cubeSizeMap);
-        final byte[][] splitKeys = 
CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig,
-                cubeSegment, null); //FIXME: passing non-null value for 
'hfileSplitsOutputFolder'
+        final byte[][] splitKeys = 
CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, 
cubeSegment, null); //FIXME: passing non-null value for 
'hfileSplitsOutputFolder'
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);
         System.out.println(cubeSegment.getStorageLocationIdentifier() + " 
table created");
         return splitKeys;
@@ -590,23 +560,22 @@ public class SparkCubing extends AbstractApplication {
         setupClasspath(sc, confPath);
         intermediateTable.cache();
         writeDictionary(intermediateTable, cubeName, segmentId);
-        final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD()
-                .map(new org.apache.spark.api.java.function.Function<Row, 
List<String>>() {
-                    @Override
-                    public List<String> call(Row v1) throws Exception {
-                        ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(v1.size());
-                        for (int i = 0; i < v1.size(); i++) {
-                            final Object o = v1.get(i);
-                            if (o != null) {
-                                result.add(o.toString());
-                            } else {
-                                result.add(null);
-                            }
-                        }
-                        return result;
-
+        final JavaRDD<List<String>> rowJavaRDD = 
intermediateTable.javaRDD().map(new 
org.apache.spark.api.java.function.Function<Row, List<String>>() {
+            @Override
+            public List<String> call(Row v1) throws Exception {
+                ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(v1.size());
+                for (int i = 0; i < v1.size(); i++) {
+                    final Object o = v1.get(i);
+                    if (o != null) {
+                        result.add(o.toString());
+                    } else {
+                        result.add(null);
                     }
-                });
+                }
+                return result;
+
+            }
+        });
 
         final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, 
cubeName, segmentId);
         final byte[][] splitKeys = createHTable(cubeName, segmentId, 
samplingResult);

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cf2a650..f70fd30 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,14 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -73,9 +65,17 @@ import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple2;
 
+import java.io.File;
+import java.io.FileFilter;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support 
source data from Hive; Metadata in HBase.
  */
@@ -83,16 +83,11 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     protected static final Logger logger = 
LoggerFactory.getLogger(SparkCubingByLayer.class);
 
-    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
-            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
-    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
-    public static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
-            .withDescription("Configuration Path").create("confPath");
-    public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
-            .isRequired(true).withDescription("Cube output 
path").create(BatchConstants.ARG_OUTPUT);
-    public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
-            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube
 Segment Id").create("segmentId");
+    public static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration
 Path").create("confPath");
+    public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube
 output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive
 Intermediate Table").create("hiveTable");
 
     private Options options;
 
@@ -165,14 +160,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(
-                EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+        final CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
         final KylinConfig kylinConfig = cubeDesc.getConfig();
         final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
         final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
-        final NDCuboidBuilder ndCuboidBuilder = new 
NDCuboidBuilder(vCubeSegment.getValue(),
-                new RowKeyEncoderProvider(vCubeSegment.getValue()));
+        final NDCuboidBuilder ndCuboidBuilder = new 
NDCuboidBuilder(vCubeSegment.getValue(), new 
RowKeyEncoderProvider(vCubeSegment.getValue()));
 
         final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new 
CuboidScheduler(vCubeDesc.getValue()));
         final int measureNum = cubeDesc.getMeasures().size();
@@ -197,50 +190,45 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
 
         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD()
-                .mapToPair(new PairFunction<Row, ByteArray, Object[]>() {
-                    volatile transient boolean initialized = false;
-                    BaseCuboidBuilder baseCuboidBuilder = null;
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, 
Object[]>() {
+            volatile transient boolean initialized = false;
+            BaseCuboidBuilder baseCuboidBuilder = null;
 
-                    @Override
-                    public Tuple2<ByteArray, Object[]> call(Row row) throws 
Exception {
+            @Override
+            public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+                if (initialized == false) {
+                    synchronized (SparkCubingByLayer.class) {
                         if (initialized == false) {
-                            synchronized (SparkCubingByLayer.class) {
-                                if (initialized == false) {
-                                    prepare();
-                                    long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
-                                    Cuboid baseCuboid = 
Cuboid.findById(cubeDesc, baseCuboidId);
-                                    baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment,
-                                            intermediateTableDesc,
-                                            
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
-                                            
MeasureIngester.create(cubeDesc.getMeasures()),
-                                            cubeSegment.buildDictionaryMap());
-                                    initialized = true;
-                                }
-                            }
+                            prepare();
+                            long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
+                            Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
+                            baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, 
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), 
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
+                            initialized = true;
                         }
-
-                        String[] rowArray = rowToArray(row);
-                        baseCuboidBuilder.resetAggrs();
-                        byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
-                        Object[] result = 
baseCuboidBuilder.buildValueObjects(rowArray);
-                        return new Tuple2<>(new ByteArray(rowKey), result);
                     }
+                }
 
-                    private String[] rowToArray(Row row) {
-                        String[] result = new String[row.size()];
-                        for (int i = 0; i < row.size(); i++) {
-                            final Object o = row.get(i);
-                            if (o != null) {
-                                result[i] = o.toString();
-                            } else {
-                                result[i] = null;
-                            }
-                        }
-                        return result;
+                String[] rowArray = rowToArray(row);
+                baseCuboidBuilder.resetAggrs();
+                byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+                Object[] result = 
baseCuboidBuilder.buildValueObjects(rowArray);
+                return new Tuple2<>(new ByteArray(rowKey), result);
+            }
+
+            private String[] rowToArray(Row row) {
+                String[] result = new String[row.size()];
+                for (int i = 0; i < row.size(); i++) {
+                    final Object o = row.get(i);
+                    if (o != null) {
+                        result[i] = o.toString();
+                    } else {
+                        result[i] = null;
                     }
+                }
+                return result;
+            }
 
-                });
+        });
 
         logger.info("encodedBaseRDD partition number: " + 
encodedBaseRDD.getNumPartitions());
         Long totalCount = 0L;
@@ -250,12 +238,10 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         }
 
         final MeasureAggregators measureAggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
-        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(measureNum,
-                vCubeDesc.getValue(), measureAggregators);
+        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), 
measureAggregators);
         BaseCuboidReducerFunction2 reducerFunction2 = 
baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
-            reducerFunction2 = new CuboidReducerFunction2(measureNum, 
vCubeDesc.getValue(), measureAggregators,
-                    needAggr);
+            reducerFunction2 = new CuboidReducerFunction2(measureNum, 
vCubeDesc.getValue(), measureAggregators, needAggr);
         }
 
         final int totalLevels = cubeDesc.getBuildLevel();
@@ -271,14 +257,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, 
confOverwrite);
 
         // aggregate to ND cuboids
-        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> 
flatMapFunction = new CuboidFlatMap(
-                vCubeSegment.getValue(), vCubeDesc.getValue(), 
vCuboidScheduler.getValue(), ndCuboidBuilder);
+        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> 
flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), 
vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
 
         for (level = 1; level <= totalLevels; level++) {
             partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
             logger.info("Level " + level + " partition number: " + partition);
-            allRDDs[level] = allRDDs[level - 
1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
-                    .persist(storageLevel);
+            allRDDs[level] = allRDDs[level - 
1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
             if (kylinConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
@@ -299,24 +283,19 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         return partition;
     }
 
-    private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final CubeDesc cubeDesc,
-            final String hdfsBaseLocation, int level, Configuration conf) {
+    private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, 
Configuration conf) {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
-        rdd.mapToPair(
-                new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-                    BufferedMeasureCodec codec = new 
BufferedMeasureCodec(cubeDesc.getMeasures());
-
-                    @Override
-                    public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(
-                            Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
-                        ByteBuffer valueBuf = codec.encode(tuple2._2());
-                        byte[] encodedBytes = new byte[valueBuf.position()];
-                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
-                        return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()),
-                                new org.apache.hadoop.io.Text(encodedBytes));
-                    }
-                }).saveAsNewAPIHadoopFile(cuboidOutputPath, 
org.apache.hadoop.io.Text.class,
-                        org.apache.hadoop.io.Text.class, 
SequenceFileOutputFormat.class, conf);
+        rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+            BufferedMeasureCodec codec = new 
BufferedMeasureCodec(cubeDesc.getMeasures());
+
+            @Override
+            public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
+                ByteBuffer valueBuf = codec.encode(tuple2._2());
+                byte[] encodedBytes = new byte[valueBuf.position()];
+                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
+                return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()), new 
org.apache.hadoop.io.Text(encodedBytes));
+            }
+        }).saveAsNewAPIHadoopFile(cuboidOutputPath, 
org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, 
SequenceFileOutputFormat.class, conf);
         logger.info("Persisting RDD for level " + level + " into " + 
cuboidOutputPath);
     }
 
@@ -366,8 +345,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         RowKeySplitter rowKeySplitter;
         transient boolean initialized = false;
 
-        CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, 
CuboidScheduler cuboidScheduler,
-                NDCuboidBuilder ndCuboidBuilder) {
+        CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, 
CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
             this.cubeSegment = cubeSegment;
             this.cubeDesc = cubeDesc;
             this.cuboidScheduler = cuboidScheduler;
@@ -396,8 +374,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             List<Tuple2<ByteArray, Object[]>> tuples = new 
ArrayList(myChildren.size());
             for (Long child : myChildren) {
                 Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-                Pair<Integer, ByteArray> result = 
ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
-                        rowKeySplitter.getSplitBuffers());
+                Pair<Integer, ByteArray> result = 
ndCuboidBuilder.buildKey(parentCuboid, childCuboid, 
rowKeySplitter.getSplitBuffers());
 
                 byte[] newKey = new byte[result.getFirst()];
                 System.arraycopy(result.getSecond().array(), 0, newKey, 0, 
result.getFirst());
@@ -411,14 +388,11 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
     //sanity check
 
-    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel,
-            CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
+    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int 
countMeasureIndex) {
         int thisCuboidNum = 
cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
         if (count2 != totalCount * thisCuboidNum) {
-            throw new IllegalStateException(
-                    String.format("Sanity check failed, level %s, total 
count(*) is %s; cuboid number %s", thisLevel,
-                            count2, thisCuboidNum));
+            throw new IllegalStateException(String.format("Sanity check 
failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, 
thisCuboidNum));
         } else {
             logger.info("sanity check success for level " + thisLevel + ", 
count(*) is " + (count2 / thisCuboidNum));
         }
@@ -433,8 +407,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             }
         }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, 
Long>, Tuple2<ByteArray, Long>>() {
             @Override
-            public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> 
longTuple2, Tuple2<ByteArray, Long> longTuple22)
-                    throws Exception {
+            public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> 
longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception {
                 return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
             }
         })._2();

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index e05d63e..1ed2235 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -84,8 +84,7 @@ public class SparkExecutable extends AbstractExecutable {
         hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
 
         if (StringUtils.isEmpty(hadoopConf)) {
-            throw new RuntimeException(
-                    "kylin_hadoop_conf_dir is empty, check if there's error in 
the output of 'kylin.sh start'");
+            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check 
if there's error in the output of 'kylin.sh start'");
         }
 
         File hiveConfFile = new File(hadoopConf, "hive-site.xml");
@@ -109,8 +108,7 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(
-                "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class 
org.apache.kylin.common.util.SparkEntry ");
+        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit 
--class org.apache.kylin.common.util.SparkEntry ");
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -119,8 +117,7 @@ public class SparkExecutable extends AbstractExecutable {
 
         stringBuilder.append("--files %s --jars %s %s %s");
         try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(),
-                    hbaseConfFile.getAbsolutePath(), jars, jobJar, 
formatArgs());
+            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, 
formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -133,4 +130,5 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
index 68ac1af..a8a4d28 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
@@ -30,8 +30,7 @@ import scala.Tuple2;
  */
 public class IteratorUtils {
 
-    public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, 
V>> input,
-            final Comparator<K> comparator, final Function<Iterable<V>, V> 
converter) {
+    public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, 
V>> input, final Comparator<K> comparator, final Function<Iterable<V>, V> 
converter) {
         return new Iterator<Tuple2<K, V>>() {
 
             Tuple2<K, V> current = input.hasNext() ? input.next() : null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
 
b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
index 548a496..8afea55 100644
--- 
a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
+++ 
b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
@@ -38,8 +38,7 @@ public class BufferedCuboidWriterTest {
         final BufferedCuboidWriter bufferedCuboidWriter = new 
BufferedCuboidWriter(new TupleConverter() {
             @Override
             public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord 
record) {
-                return new 
Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(),
-                        Long.valueOf(cuboidId).toString().getBytes());
+                return new 
Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), 
Long.valueOf(cuboidId).toString().getBytes());
             }
         });
         final int testCount = 10000000;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
 
b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
index 095c041..b181d33 100644
--- 
a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
+++ 
b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
@@ -18,21 +18,20 @@
 
 package org.apache.kylin.engine.spark.util;
 
-import java.io.Serializable;
-import java.util.Set;
-import java.util.TreeSet;
-
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.measure.MeasureIngester;
 import org.reflections.Reflections;
 
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
 /**
  * Generate Kyro Registrator class, the output will be added into 
KylinKyroRegistrator manually. No runtime dependency with Reflections.
  */
 public class KyroMappingGenerator {
     public static void main(String[] args) {
-        Set<Class<? extends Serializable>> subTypesOfSerializable = new 
Reflections("org.apache.kylin")
-                .getSubTypesOf(Serializable.class);
+        Set<Class<? extends Serializable>> subTypesOfSerializable = new 
Reflections("org.apache.kylin").getSubTypesOf(Serializable.class);
         String begin = "kyroClasses.add(";
         String end = ".class);";
         TreeSet<String> sortedSet = new TreeSet();
@@ -40,14 +39,12 @@ public class KyroMappingGenerator {
             if (clazz.getCanonicalName() != null)
                 sortedSet.add(clazz.getCanonicalName());
         }
-        Set<Class<? extends BytesSerializer>> subTypesOfBytes = new 
Reflections("org.apache.kylin.metadata.datatype")
-                .getSubTypesOf(BytesSerializer.class);
+        Set<Class<? extends BytesSerializer>> subTypesOfBytes = new 
Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(BytesSerializer.class);
         for (Class clazz : subTypesOfBytes) {
             if (clazz.getCanonicalName() != null)
                 sortedSet.add(clazz.getCanonicalName());
         }
-        Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new 
Reflections("org.apache.kylin.measure")
-                .getSubTypesOf(MeasureIngester.class);
+        Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new 
Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class);
         for (Class clazz : subTypesOfMeasure) {
             if (clazz.getCanonicalName() != null)
                 sortedSet.add(clazz.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java 
b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
index cfc4f5c..33d82f80 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
@@ -74,8 +74,7 @@ public class Driver extends UnregisteredDriver {
         try {
             DriverManager.registerDriver(new Driver());
         } catch (SQLException e) {
-            throw new RuntimeException(
-                    "Error occurred while registering JDBC driver " + 
Driver.class.getName() + ": " + e.toString());
+            throw new RuntimeException("Error occurred while registering JDBC 
driver " + Driver.class.getName() + ": " + e.toString());
         }
     }
 
@@ -86,8 +85,7 @@ public class Driver extends UnregisteredDriver {
 
     @Override
     protected DriverVersion createDriverVersion() {
-        return DriverVersion.load(Driver.class, 
"org-apache-kylin-jdbc.properties", "Kylin JDBC Driver",
-                "unknown version", "Kylin", "unknown version");
+        return DriverVersion.load(Driver.class, 
"org-apache-kylin-jdbc.properties", "Kylin JDBC Driver", "unknown version", 
"Kylin", "unknown version");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java 
b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
index 56ce2b4..dfd8d76 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
@@ -52,7 +52,6 @@ public interface IRemoteClient extends Closeable {
     /**
      * Execute query remotely and get back result.
      */
-    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, 
List<Object> paramValues,
-            Map<String, String> queryToggles) throws IOException;
+    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, 
List<Object> paramValues, Map<String, String> queryToggles) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java 
b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
index 0d43f8d..86c3a5b 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
@@ -85,8 +85,7 @@ public class KylinClient implements IRemoteClient {
         if (isSSL()) {
             try {
                 SSLSocketFactory sslsf = new SSLSocketFactory(new 
TrustStrategy() {
-                    public boolean isTrusted(final X509Certificate[] chain, 
String authType)
-                            throws CertificateException {
+                    public boolean isTrusted(final X509Certificate[] chain, 
String authType) throws CertificateException {
                         // Oh, I am easy...
                         return true;
                     }
@@ -251,9 +250,8 @@ public class KylinClient implements IRemoteClient {
             throw asIOException(get, response);
         }
 
-        List<TableMetaStub> tableMetaStubs = 
jsonMapper.readValue(response.getEntity().getContent(),
-                new TypeReference<List<TableMetaStub>>() {
-                });
+        List<TableMetaStub> tableMetaStubs = 
jsonMapper.readValue(response.getEntity().getContent(), new 
TypeReference<List<TableMetaStub>>() {
+        });
 
         List<KMetaTable> tables = convertMetaTables(tableMetaStubs);
         List<KMetaSchema> schemas = convertMetaSchemas(tables);
@@ -315,21 +313,15 @@ public class KylinClient implements IRemoteClient {
         for (ColumnMetaStub columnStub : tableStub.getColumns()) {
             columns.add(convertMetaColumn(columnStub));
         }
-        return new KMetaTable(tableStub.getTABLE_CAT(), 
tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(),
-                tableStub.getTABLE_TYPE(), columns);
+        return new KMetaTable(tableStub.getTABLE_CAT(), 
tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(), 
tableStub.getTABLE_TYPE(), columns);
     }
 
     private KMetaColumn convertMetaColumn(ColumnMetaStub columnStub) {
-        return new KMetaColumn(columnStub.getTABLE_CAT(), 
columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(),
-                columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), 
columnStub.getTYPE_NAME(),
-                columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), 
columnStub.getNUM_PREC_RADIX(),
-                columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), 
columnStub.getORDINAL_POSITION(),
-                columnStub.getIS_NULLABLE());
+        return new KMetaColumn(columnStub.getTABLE_CAT(), 
columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(), 
columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), 
columnStub.getTYPE_NAME(), columnStub.getCOLUMN_SIZE(), 
columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(), 
columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), 
columnStub.getORDINAL_POSITION(), columnStub.getIS_NULLABLE());
     }
 
     @Override
-    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, 
List<Object> paramValues,
-            Map<String, String> queryToggles) throws IOException {
+    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, 
List<Object> paramValues, Map<String, String> queryToggles) throws IOException {
 
         SQLResponseStub queryResp = executeKylinQuery(sql, 
convertParameters(params, paramValues), queryToggles);
         if (queryResp.getIsException())
@@ -354,8 +346,7 @@ public class KylinClient implements IRemoteClient {
         return result;
     }
 
-    private SQLResponseStub executeKylinQuery(String sql, 
List<StatementParameter> params,
-            Map<String, String> queryToggles) throws IOException {
+    private SQLResponseStub executeKylinQuery(String sql, 
List<StatementParameter> params, Map<String, String> queryToggles) throws 
IOException {
         String url = baseUrl() + "/kylin/api/query";
         String project = conn.getProject();
 
@@ -397,11 +388,7 @@ public class KylinClient implements IRemoteClient {
             Class columnClass = convertType(scm.getColumnType());
             ScalarType type = ColumnMetaData.scalar(scm.getColumnType(), 
scm.getColumnTypeName(), Rep.of(columnClass));
 
-            ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), 
scm.isCaseSensitive(),
-                    scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), 
scm.isSigned(), scm.getDisplaySize(),
-                    scm.getLabel(), scm.getName(), scm.getSchemaName(), 
scm.getPrecision(), scm.getScale(),
-                    scm.getTableName(), scm.getSchemaName(), type, 
scm.isReadOnly(), scm.isWritable(), scm.isWritable(),
-                    columnClass.getCanonicalName());
+            ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), 
scm.isCaseSensitive(), scm.isSearchable(), scm.isCurrency(), 
scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(), scm.getLabel(), 
scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(), 
scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), 
scm.isWritable(), scm.isWritable(), columnClass.getCanonicalName());
 
             metas.add(meta);
         }
@@ -426,8 +413,7 @@ public class KylinClient implements IRemoteClient {
     }
 
     private IOException asIOException(HttpRequestBase request, HttpResponse 
response) throws IOException {
-        return new IOException(request.getMethod() + " failed, error code " + 
response.getStatusLine().getStatusCode()
-                + " and response: " + 
EntityUtils.toString(response.getEntity()));
+        return new IOException(request.getMethod() + " failed, error code " + 
response.getStatusLine().getStatusCode() + " and response: " + 
EntityUtils.toString(response.getEntity()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java 
b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
index 7fd09d6..6852998 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
@@ -47,8 +47,7 @@ public class KylinConnection extends AvaticaConnection {
     private final String project;
     private final IRemoteClient remoteClient;
 
-    protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory 
factory, String url, Properties info)
-            throws SQLException {
+    protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory 
factory, String url, Properties info) throws SQLException {
         super(driver, factory, url, info);
 
         String odbcUrl = url;
@@ -84,8 +83,7 @@ public class KylinConnection extends AvaticaConnection {
     }
 
     @Override
-    public AvaticaStatement createStatement(int resultSetType, int 
resultSetConcurrency, int resultSetHoldability)
-            throws SQLException {
+    public AvaticaStatement createStatement(int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
         return super.createStatement(resultSetType, resultSetConcurrency, 
resultSetHoldability);
     }
 
@@ -102,11 +100,9 @@ public class KylinConnection extends AvaticaConnection {
     }
 
     @Override
-    public PreparedStatement prepareStatement(String sql, int resultSetType, 
int resultSetConcurrency,
-            int resultSetHoldability) throws SQLException {
+    public PreparedStatement prepareStatement(String sql, int resultSetType, 
int resultSetConcurrency, int resultSetHoldability) throws SQLException {
         Meta.Signature sig = mockPreparedSignature(sql);
-        return factory().newPreparedStatement(this, null, sig, resultSetType, 
resultSetConcurrency,
-                resultSetHoldability);
+        return factory().newPreparedStatement(this, null, sig, resultSetType, 
resultSetConcurrency, resultSetHoldability);
     }
 
     // TODO add restful API to prepare SQL, get back expected ResultSetMetaData

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java 
b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
index 32bf6ca..6aae983 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
@@ -73,8 +73,7 @@ public class KylinJdbcFactory implements AvaticaFactory {
     }
 
     @Override
-    public AvaticaConnection newConnection(UnregisteredDriver driver, 
AvaticaFactory factory, String url,
-            Properties info) throws SQLException {
+    public AvaticaConnection newConnection(UnregisteredDriver driver, 
AvaticaFactory factory, String url, Properties info) throws SQLException {
         return new KylinConnection(driver, (KylinJdbcFactory) factory, url, 
info);
     }
 
@@ -85,23 +84,17 @@ public class KylinJdbcFactory implements AvaticaFactory {
     }
 
     @Override
-    public AvaticaStatement newStatement(AvaticaConnection connection, 
StatementHandle h, int resultSetType,
-            int resultSetConcurrency, int resultSetHoldability) throws 
SQLException {
-        return new KylinStatement((KylinConnection) connection, h, 
resultSetType, resultSetConcurrency,
-                resultSetHoldability);
+    public AvaticaStatement newStatement(AvaticaConnection connection, 
StatementHandle h, int resultSetType, int resultSetConcurrency, int 
resultSetHoldability) throws SQLException {
+        return new KylinStatement((KylinConnection) connection, h, 
resultSetType, resultSetConcurrency, resultSetHoldability);
     }
 
     @Override
-    public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection 
connection, StatementHandle h,
-            Signature signature, int resultSetType, int resultSetConcurrency, 
int resultSetHoldability)
-            throws SQLException {
-        return new KylinPreparedStatement((KylinConnection) connection, h, 
signature, resultSetType,
-                resultSetConcurrency, resultSetHoldability);
+    public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection 
connection, StatementHandle h, Signature signature, int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        return new KylinPreparedStatement((KylinConnection) connection, h, 
signature, resultSetType, resultSetConcurrency, resultSetHoldability);
     }
 
     @Override
-    public AvaticaResultSet newResultSet(AvaticaStatement statement, 
QueryState state, Signature signature,
-            TimeZone timeZone, Frame firstFrame) throws SQLException {
+    public AvaticaResultSet newResultSet(AvaticaStatement statement, 
QueryState state, Signature signature, TimeZone timeZone, Frame firstFrame) 
throws SQLException {
         AvaticaResultSetMetaData resultSetMetaData = new 
AvaticaResultSetMetaData(statement, null, signature);
         return new KylinResultSet(statement, state, signature, 
resultSetMetaData, timeZone, firstFrame);
     }

Reply via email to