http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java index 869146b..f018f28 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.kylin.engine.mr.steps; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; +package org.apache.kylin.engine.mr.steps; +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; import org.apache.commons.lang.RandomStringUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; @@ -33,10 +32,11 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.Lists; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; @Ignore public class NewCubeSamplingMethodTest { @@ -64,6 +64,7 @@ public class NewCubeSamplingMethodTest { compareAccuracyBasic(dataSet); } + @Ignore @Test public void testSmallCardData() throws Exception { @@ -72,6 +73,7 @@ public class NewCubeSamplingMethodTest { compareAccuracyBasic(dataSet); } + public void comparePerformanceBasic(final List<List<String>> rows) throws Exception { //old hash method ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH); @@ -120,11 +122,11 @@ public class NewCubeSamplingMethodTest { counter.add(hc.hash().asBytes()); } long estimate = counter.getCountEstimate(); - System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " - + countErrorRate(estimate, realCardinality)); + System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality)); } }); + long t2 = runAndGetTime(new TestCase() { @Override public void run() throws Exception { @@ -147,8 +149,7 @@ public class NewCubeSamplingMethodTest { counter.addHashDirectly(value); } long estimate = counter.getCountEstimate(); - System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " - + countErrorRate(estimate, realCardinality)); + System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality)); } }); } @@ -178,6 +179,7 @@ public class NewCubeSamplingMethodTest { return counters; } + private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) { Integer[] indice = new Integer[Long.bitCount(cuboidId)]; @@ -206,8 +208,7 @@ public class NewCubeSamplingMethodTest { void run() throws Exception; } - private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, - HashFunction hashFunction) { + private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) { int x = 0; for (String field : row) { Hasher hc = hashFunction.newHasher(); @@ -224,8 +225,7 @@ public class NewCubeSamplingMethodTest { } } - private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, - HashFunction hashFunction) { + private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) { int x = 0; for (String field : row) { Hasher hc = hashFunction.newHasher(); @@ -266,7 +266,7 @@ public class NewCubeSamplingMethodTest { return row; } - private String[] smallCardRow = { "abc", "bcd", "jifea", "feaifj" }; + private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"}; private Random rand = new Random(System.currentTimeMillis()); @@ -279,6 +279,7 @@ public class NewCubeSamplingMethodTest { return row; } + private int countCardinality(List<List<String>> rows) { Set<String> diffCols = new HashSet<String>(); for (List<String> row : rows) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java index ab55bcf..414ab95 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java @@ -67,7 +67,7 @@ public class NumberDictionaryForestTest { //stimulate map-reduce job ArrayList<SelfDefineSortableKey> keyList = createKeyList(humanList, (byte) flag.ordinal()); Collections.sort(keyList); - + //build tree NumberDictionaryForestBuilder b = new NumberDictionaryForestBuilder(0, 0); expectedList = numberSort(expectedList); @@ -76,7 +76,7 @@ public class NumberDictionaryForestTest { } TrieDictionaryForest<String> dict = b.build(); dict.dump(System.out); - + ArrayList<Integer> resultIds = new ArrayList<>(); for (int i = 0; i < keyList.size(); i++) { SelfDefineSortableKey key = keyList.get(i); @@ -84,7 +84,7 @@ public class NumberDictionaryForestTest { resultIds.add(dict.getIdFromValue(fieldValue)); assertEquals(expectedList.get(i), dict.getValueFromId(dict.getIdFromValue(fieldValue))); } - + assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { @@ -101,8 +101,7 @@ public class NumberDictionaryForestTest { double d1 = Double.parseDouble(o1); double d2 = Double.parseDouble(o2); return Double.compare(d1, d2); - } - }); + }}); return result; } @@ -291,18 +290,16 @@ public class NumberDictionaryForestTest { int flag; T previous = null; for (T t : list) { - if (previous == null) - previous = t; + if (previous == null) previous = t; else { flag = comp.compare(previous, t); - if (flag > 0) - return false; + if (flag > 0) return false; previous = t; } } return true; } - + @Test public void testNormalizeNumber() { assertEquals("0", Number2BytesConverter.normalizeNumber("+0000.000")); @@ -314,7 +311,7 @@ public class NumberDictionaryForestTest { assertEquals("200", Number2BytesConverter.normalizeNumber("200")); assertEquals("200", Number2BytesConverter.normalizeNumber("200.00")); assertEquals("200.01", Number2BytesConverter.normalizeNumber("200.010")); - + for (int i = -100; i < 101; i++) { String expected = "" + i; int cut = expected.startsWith("-") ? 1 : 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java index 231387b..551998f 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java @@ -56,8 +56,7 @@ public class SelfDefineSortableKeyTest { System.out.println("test numbers:" + longList); ArrayList<String> strNumList = listToStringList(longList); //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, - (byte) SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); System.out.println(keyList.get(0).isIntegerFamily()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); @@ -93,8 +92,7 @@ public class SelfDefineSortableKeyTest { System.out.println("test numbers:" + doubleList); ArrayList<String> strNumList = listToStringList(doubleList); //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, - (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); for (SelfDefineSortableKey key : keyList) { @@ -123,8 +121,7 @@ public class SelfDefineSortableKeyTest { strList.add("hello"); //duplicate strList.add("123"); strList.add(""); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, - (byte) SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte) SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); for (SelfDefineSortableKey key : keyList) { @@ -154,16 +151,17 @@ public class SelfDefineSortableKeyTest { doubleList.add(-Double.MAX_VALUE); //System.out.println(Double.MIN_VALUE); + ArrayList<String> strNumList = listToStringList(doubleList); //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, - (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); System.out.println("start to test str sort"); long start = System.currentTimeMillis(); Collections.sort(strNumList); System.out.println("sort time : " + (System.currentTimeMillis() - start)); + System.out.println("start to test double sort"); start = System.currentTimeMillis(); Collections.sort(keyList); @@ -191,6 +189,7 @@ public class SelfDefineSortableKeyTest { System.out.println("sort time : " + (System.currentTimeMillis() - start)); } + @Test public void testIllegalNumber() { Random rand = new Random(System.currentTimeMillis()); @@ -211,11 +210,10 @@ public class SelfDefineSortableKeyTest { strNumList.add("fjaeif"); //illegal type //System.out.println("test num strs list:"+strNumList); try { - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, - (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); Collections.sort(keyList); fail("Need catch exception"); - } catch (Exception e) { + }catch(Exception e){ //correct } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index cf6d0a8..66b154d 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -47,8 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), - seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java index ef39c69..6478c10 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java @@ -37,8 +37,7 @@ import scala.Tuple2; */ public class SparkCountDemo extends AbstractApplication { - private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true) - .withDescription("Input path").create("input"); + private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); private Options options; @@ -57,29 +56,25 @@ public class SparkCountDemo extends AbstractApplication { String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); - final JavaPairRDD<String, Integer> logData = sc.textFile(logFile) - .mapToPair(new PairFunction<String, String, Integer>() { + final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) throws Exception { - return new Tuple2<String, Integer>(s, s.length()); - } - }).sortByKey(); + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, s.length()); + } + }).sortByKey(); logData.persist(StorageLevel.MEMORY_AND_DISK_SER()); System.out.println("line number:" + logData.count()); logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() { @Override - public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) - throws Exception { + public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes()); - KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), - String.valueOf(stringIntegerTuple2._2()).getBytes()); + KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes()); return new Tuple2(key, value); } - }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, - HFileOutputFormat.class); + }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index c3326ff..2a0981a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -120,16 +120,11 @@ public class SparkCubing extends AbstractApplication { protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); - private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true) - .withDescription("Hive Intermediate Table").create("hiveTable"); - private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() - .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); - private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true) - .withDescription("Configuration Path").create("confPath"); - private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true) - .withDescription("Coprocessor Jar Path").create("coprocessor"); + private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor"); private Options options; @@ -192,10 +187,8 @@ public class SparkCubing extends AbstractApplication { final CubeSegment seg = cubeInstance.getSegmentById(segmentId); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); - final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich( - EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); - final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)) - .getColumns(); + final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); + final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); final long start = System.currentTimeMillis(); final RowKeyDesc rowKey = cubeDesc.getRowkey(); for (int i = 0; i < baseCuboidColumn.size(); i++) { @@ -214,36 +207,35 @@ public class SparkCubing extends AbstractApplication { final DataFrame frame = intermediateTable.select(column).distinct(); final Row[] rows = frame.collect(); - dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), - new IterableDictionaryValueEnumerator(new Iterable<String>() { - @Override - public Iterator<String> iterator() { - return new Iterator<String>() { - int i = 0; + dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() { + @Override + public Iterator<String> iterator() { + return new Iterator<String>() { + int i = 0; - @Override - public boolean hasNext() { - return i < rows.length; - } + @Override + public boolean hasNext() { + return i < rows.length; + } - @Override - public String next() { - if (hasNext()) { - final Row row = rows[i++]; - final Object o = row.get(0); - return o != null ? o.toString() : null; - } else { - throw new NoSuchElementException(); - } - } + @Override + public String next() { + if (hasNext()) { + final Row row = rows[i++]; + final Object o = row.get(0); + return o != null ? o.toString() : null; + } else { + throw new NoSuchElementException(); + } + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + @Override + public void remove() { + throw new UnsupportedOperationException(); } - }))); + }; + } + }))); } final long end = System.currentTimeMillis(); CubingUtils.writeDictionary(seg, dictionaryMap, start, end); @@ -256,8 +248,7 @@ public class SparkCubing extends AbstractApplication { } } - private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, - String segmentId) throws Exception { + private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -268,8 +259,7 @@ public class SparkCubing extends AbstractApplication { zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); } - CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich( - EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes(); final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; @@ -295,56 +285,52 @@ public class SparkCubing extends AbstractApplication { row_hashcodes[i] = new ByteArray(); } - final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, - new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { + final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { - final HashFunction hashFunction = Hashing.murmur3_128(); + final HashFunction hashFunction = Hashing.murmur3_128(); - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) - throws Exception { - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hashFunction.newHasher(); - String colValue = v2.get(rowKeyColumnIndexes[i]); - if (colValue != null) { - row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); - } else { - row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); - } - } - - for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { - Hasher hc = hashFunction.newHasher(); - HLLCounter counter = v1.get(entry.getKey()); - final Integer[] cuboidBitSet = entry.getValue(); - for (int position = 0; position < cuboidBitSet.length; position++) { - hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); - } - counter.add(hc.hash().asBytes()); - } - return v1; + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception { + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hashFunction.newHasher(); + String colValue = v2.get(rowKeyColumnIndexes[i]); + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); } - }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) - throws Exception { - Preconditions.checkArgument(v1.size() == v2.size()); - Preconditions.checkArgument(v1.size() > 0); - for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { - final HLLCounter counter1 = entry.getValue(); - final HLLCounter counter2 = v2.get(entry.getKey()); - counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); - } - return v1; + } + + for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { + Hasher hc = hashFunction.newHasher(); + HLLCounter counter = v1.get(entry.getKey()); + final Integer[] cuboidBitSet = entry.getValue(); + for (int position = 0; position < cuboidBitSet.length; position++) { + hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); } + counter.add(hc.hash().asBytes()); + } + return v1; + } + }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception { + Preconditions.checkArgument(v1.size() == v2.size()); + Preconditions.checkArgument(v1.size() > 0); + for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { + final HLLCounter counter1 = entry.getValue(); + final HLLCounter counter2 = v2.get(entry.getKey()); + counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); + } + return v1; + } - }); + }); return samplingResult; } /** return hfile location */ - private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, - final byte[][] splitKeys) throws Exception { + private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -378,41 +364,35 @@ public class SparkCubing extends AbstractApplication { } } - final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom() - .mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { + final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { - @Override - public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) - throws Exception { - long t = System.currentTimeMillis(); - prepare(); - - final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getCube(cubeName); - - LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); - System.out.println("load properties finished"); - IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); - AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), - flatDesc, dictionaryMap); - final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter( - new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); - Executors.newCachedThreadPool() - .submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); - try { - while (listIterator.hasNext()) { - for (List<String> row : listIterator.next()) { - blockingQueue.put(row); - } - } - blockingQueue.put(Collections.<String> emptyList()); - } catch (Exception e) { - throw new RuntimeException(e); + @Override + public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { + long t = System.currentTimeMillis(); + prepare(); + + final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + + LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); + System.out.println("load properties finished"); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); + AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); + final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); + Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); + try { + while (listIterator.hasNext()) { + for (List<String> row : listIterator.next()) { + blockingQueue.put(row); } - System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); - return sparkCuboidWriter.getResult(); } - }); + blockingQueue.put(Collections.<String> emptyList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); + return sparkCuboidWriter.getResult(); + } + }); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); @@ -431,9 +411,7 @@ public class SparkCubing extends AbstractApplication { return url; } - private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, - final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, - final String hFileLocation) { + private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) { javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int numPartitions() { @@ -450,52 +428,46 @@ public class SparkCubing extends AbstractApplication { } return splitKeys.length; } - }, UnsignedBytes.lexicographicalComparator()) - .mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() { - @Override - public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) - throws Exception { - return new Iterable<Tuple2<byte[], byte[]>>() { - final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes); - final Object[] input = new Object[measureSize]; - final Object[] result = new Object[measureSize]; + }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() { + @Override + public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { + return new Iterable<Tuple2<byte[], byte[]>>() { + final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes); + final Object[] input = new Object[measureSize]; + final Object[] result = new Object[measureSize]; + @Override + public Iterator<Tuple2<byte[], byte[]>> iterator() { + return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() { @Override - public Iterator<Tuple2<byte[], byte[]>> iterator() { - return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), - new Function<Iterable<byte[]>, byte[]>() { - @Override - public byte[] call(Iterable<byte[]> v1) throws Exception { - final LinkedList<byte[]> list = Lists.newLinkedList(v1); - if (list.size() == 1) { - return list.get(0); - } - aggs.reset(); - for (byte[] v : list) { - codec.decode(ByteBuffer.wrap(v), input); - aggs.aggregate(input); - } - aggs.collectStates(result); - ByteBuffer buffer = codec.encode(result); - byte[] bytes = new byte[buffer.position()]; - System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, - buffer.position()); - return bytes; - } - }); + public byte[] call(Iterable<byte[]> v1) throws Exception { + final LinkedList<byte[]> list = Lists.newLinkedList(v1); + if (list.size() == 1) { + return list.get(0); + } + aggs.reset(); + for (byte[] v : list) { + codec.decode(ByteBuffer.wrap(v), input); + aggs.aggregate(input); + } + aggs.collectStates(result); + ByteBuffer buffer = codec.encode(result); + byte[] bytes = new byte[buffer.position()]; + System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position()); + return bytes; } - }; - } - }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { - @Override - public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) - throws Exception { - ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1()); - KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2()); - return new Tuple2(key, value); + }); } - }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, - HFileOutputFormat.class, conf); + }; + } + }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { + @Override + public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception { + ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1()); + KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2()); + return new Tuple2(key, value); + } + }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); } public static void prepare() throws Exception { @@ -506,16 +478,14 @@ public class SparkCubing extends AbstractApplication { ClassUtil.addClasspath(confPath); } - private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) - throws Exception { + private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100); final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap); System.out.println("cube size estimation:" + cubeSizeMap); - final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, - cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder' + final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder' CubeHTableUtil.createHTable(cubeSegment, splitKeys); System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created"); return splitKeys; @@ -590,23 +560,22 @@ public class SparkCubing extends AbstractApplication { setupClasspath(sc, confPath); intermediateTable.cache(); writeDictionary(intermediateTable, cubeName, segmentId); - final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD() - .map(new org.apache.spark.api.java.function.Function<Row, List<String>>() { - @Override - public List<String> call(Row v1) throws Exception { - ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size()); - for (int i = 0; i < v1.size(); i++) { - final Object o = v1.get(i); - if (o != null) { - result.add(o.toString()); - } else { - result.add(null); - } - } - return result; - + final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() { + @Override + public List<String> call(Row v1) throws Exception { + ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size()); + for (int i = 0; i < v1.size(); i++) { + final Object o = v1.get(i); + if (o != null) { + result.add(o.toString()); + } else { + result.add(null); } - }); + } + return result; + + } + }); final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index cf2a650..f70fd30 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,14 +17,6 @@ */ package org.apache.kylin.engine.spark; -import java.io.File; -import java.io.FileFilter; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -73,9 +65,17 @@ import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple2; +import java.io.File; +import java.io.FileFilter; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + /** * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. */ @@ -83,16 +83,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class); - public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() - .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); - public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true) - .withDescription("Configuration Path").create("confPath"); - public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() - .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); - public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true) - .withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); private Options options; @@ -165,14 +160,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich( - EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); final KylinConfig kylinConfig = cubeDesc.getConfig(); final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); - final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), - new RowKeyEncoderProvider(vCubeSegment.getValue())); + final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); final int measureNum = cubeDesc.getMeasures().size(); @@ -197,50 +190,45 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); // encode with dimension encoding, transform to <ByteArray, Object[]> RDD - final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() - .mapToPair(new PairFunction<Row, ByteArray, Object[]>() { - volatile transient boolean initialized = false; - BaseCuboidBuilder baseCuboidBuilder = null; + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { + volatile transient boolean initialized = false; + BaseCuboidBuilder baseCuboidBuilder = null; - @Override - public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { + @Override + public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { + if (initialized == false) { + synchronized (SparkCubingByLayer.class) { if (initialized == false) { - synchronized (SparkCubingByLayer.class) { - if (initialized == false) { - prepare(); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, - intermediateTableDesc, - AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), - MeasureIngester.create(cubeDesc.getMeasures()), - cubeSegment.buildDictionaryMap()); - initialized = true; - } - } + prepare(); + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + initialized = true; } - - String[] rowArray = rowToArray(row); - baseCuboidBuilder.resetAggrs(); - byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); - Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); - return new Tuple2<>(new ByteArray(rowKey), result); } + } - private String[] rowToArray(Row row) { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; + String[] rowArray = rowToArray(row); + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + return new Tuple2<>(new ByteArray(rowKey), result); + } + + private String[] rowToArray(Row row) { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; } + } + return result; + } - }); + }); logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions()); Long totalCount = 0L; @@ -250,12 +238,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures()); - final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, - vCubeDesc.getValue(), measureAggregators); + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { - reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, - needAggr); + reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr); } final int totalLevels = cubeDesc.getBuildLevel(); @@ -271,14 +257,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite); // aggregate to ND cuboids - PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap( - vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); + PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); for (level = 1; level <= totalLevels; level++) { partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); logger.info("Level " + level + " partition number: " + partition); - allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition) - .persist(storageLevel); + allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel); if (kylinConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } @@ -299,24 +283,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return partition; } - private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, - final String hdfsBaseLocation, int level, Configuration conf) { + private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); - rdd.mapToPair( - new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { - BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - - @Override - public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call( - Tuple2<ByteArray, Object[]> tuple2) throws Exception { - ByteBuffer valueBuf = codec.encode(tuple2._2()); - byte[] encodedBytes = new byte[valueBuf.position()]; - System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); - return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), - new org.apache.hadoop.io.Text(encodedBytes)); - } - }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, - org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); + } + }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); } @@ -366,8 +345,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa RowKeySplitter rowKeySplitter; transient boolean initialized = false; - CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, - NDCuboidBuilder ndCuboidBuilder) { + CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { this.cubeSegment = cubeSegment; this.cubeDesc = cubeDesc; this.cuboidScheduler = cuboidScheduler; @@ -396,8 +374,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size()); for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, - rowKeySplitter.getSplitBuffers()); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); byte[] newKey = new byte[result.getFirst()]; System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); @@ -411,14 +388,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa //sanity check - private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, - CubeStatsReader cubeStatsReader, final int countMeasureIndex) { + private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) { int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size(); Long count2 = getRDDCountSum(rdd, countMeasureIndex); if (count2 != totalCount * thisCuboidNum) { - throw new IllegalStateException( - String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, - count2, thisCuboidNum)); + throw new IllegalStateException(String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, thisCuboidNum)); } else { logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum)); } @@ -433,8 +407,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { @Override - public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) - throws Exception { + public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception { return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2()); } })._2(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index e05d63e..1ed2235 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -84,8 +84,7 @@ public class SparkExecutable extends AbstractExecutable { hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); if (StringUtils.isEmpty(hadoopConf)) { - throw new RuntimeException( - "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); + throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); } File hiveConfFile = new File(hadoopConf, "hive-site.xml"); @@ -109,8 +108,7 @@ public class SparkExecutable extends AbstractExecutable { } StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append( - "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); + stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); Map<String, String> sparkConfs = config.getSparkConfigOverride(); for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { @@ -119,8 +117,7 @@ public class SparkExecutable extends AbstractExecutable { stringBuilder.append("--files %s --jars %s %s %s"); try { - String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), - hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); + String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); logger.info("cmd: " + cmd); CliCommandExecutor exec = new CliCommandExecutor(); PatternedLogger patternedLogger = new PatternedLogger(logger); @@ -133,4 +130,5 @@ public class SparkExecutable extends AbstractExecutable { } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java index 68ac1af..a8a4d28 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java @@ -30,8 +30,7 @@ import scala.Tuple2; */ public class IteratorUtils { - public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, V>> input, - final Comparator<K> comparator, final Function<Iterable<V>, V> converter) { + public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, V>> input, final Comparator<K> comparator, final Function<Iterable<V>, V> converter) { return new Iterator<Tuple2<K, V>>() { Tuple2<K, V> current = input.hasNext() ? input.next() : null; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java index 548a496..8afea55 100644 --- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java +++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java @@ -38,8 +38,7 @@ public class BufferedCuboidWriterTest { final BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new TupleConverter() { @Override public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), - Long.valueOf(cuboidId).toString().getBytes()); + return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), Long.valueOf(cuboidId).toString().getBytes()); } }); final int testCount = 10000000; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java index 095c041..b181d33 100644 --- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java +++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java @@ -18,21 +18,20 @@ package org.apache.kylin.engine.spark.util; -import java.io.Serializable; -import java.util.Set; -import java.util.TreeSet; - import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.measure.MeasureIngester; import org.reflections.Reflections; +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + /** * Generate Kyro Registrator class, the output will be added into KylinKyroRegistrator manually. No runtime dependency with Reflections. */ public class KyroMappingGenerator { public static void main(String[] args) { - Set<Class<? extends Serializable>> subTypesOfSerializable = new Reflections("org.apache.kylin") - .getSubTypesOf(Serializable.class); + Set<Class<? extends Serializable>> subTypesOfSerializable = new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class); String begin = "kyroClasses.add("; String end = ".class);"; TreeSet<String> sortedSet = new TreeSet(); @@ -40,14 +39,12 @@ public class KyroMappingGenerator { if (clazz.getCanonicalName() != null) sortedSet.add(clazz.getCanonicalName()); } - Set<Class<? extends BytesSerializer>> subTypesOfBytes = new Reflections("org.apache.kylin.metadata.datatype") - .getSubTypesOf(BytesSerializer.class); + Set<Class<? extends BytesSerializer>> subTypesOfBytes = new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(BytesSerializer.class); for (Class clazz : subTypesOfBytes) { if (clazz.getCanonicalName() != null) sortedSet.add(clazz.getCanonicalName()); } - Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new Reflections("org.apache.kylin.measure") - .getSubTypesOf(MeasureIngester.class); + Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class); for (Class clazz : subTypesOfMeasure) { if (clazz.getCanonicalName() != null) sortedSet.add(clazz.getCanonicalName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java index cfc4f5c..33d82f80 100644 --- a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java +++ b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java @@ -74,8 +74,7 @@ public class Driver extends UnregisteredDriver { try { DriverManager.registerDriver(new Driver()); } catch (SQLException e) { - throw new RuntimeException( - "Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString()); + throw new RuntimeException("Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString()); } } @@ -86,8 +85,7 @@ public class Driver extends UnregisteredDriver { @Override protected DriverVersion createDriverVersion() { - return DriverVersion.load(Driver.class, "org-apache-kylin-jdbc.properties", "Kylin JDBC Driver", - "unknown version", "Kylin", "unknown version"); + return DriverVersion.load(Driver.class, "org-apache-kylin-jdbc.properties", "Kylin JDBC Driver", "unknown version", "Kylin", "unknown version"); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java index 56ce2b4..dfd8d76 100644 --- a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java +++ b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java @@ -52,7 +52,6 @@ public interface IRemoteClient extends Closeable { /** * Execute query remotely and get back result. */ - public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, - Map<String, String> queryToggles) throws IOException; + public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, Map<String, String> queryToggles) throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java index 0d43f8d..86c3a5b 100644 --- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java +++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java @@ -85,8 +85,7 @@ public class KylinClient implements IRemoteClient { if (isSSL()) { try { SSLSocketFactory sslsf = new SSLSocketFactory(new TrustStrategy() { - public boolean isTrusted(final X509Certificate[] chain, String authType) - throws CertificateException { + public boolean isTrusted(final X509Certificate[] chain, String authType) throws CertificateException { // Oh, I am easy... return true; } @@ -251,9 +250,8 @@ public class KylinClient implements IRemoteClient { throw asIOException(get, response); } - List<TableMetaStub> tableMetaStubs = jsonMapper.readValue(response.getEntity().getContent(), - new TypeReference<List<TableMetaStub>>() { - }); + List<TableMetaStub> tableMetaStubs = jsonMapper.readValue(response.getEntity().getContent(), new TypeReference<List<TableMetaStub>>() { + }); List<KMetaTable> tables = convertMetaTables(tableMetaStubs); List<KMetaSchema> schemas = convertMetaSchemas(tables); @@ -315,21 +313,15 @@ public class KylinClient implements IRemoteClient { for (ColumnMetaStub columnStub : tableStub.getColumns()) { columns.add(convertMetaColumn(columnStub)); } - return new KMetaTable(tableStub.getTABLE_CAT(), tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(), - tableStub.getTABLE_TYPE(), columns); + return new KMetaTable(tableStub.getTABLE_CAT(), tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(), tableStub.getTABLE_TYPE(), columns); } private KMetaColumn convertMetaColumn(ColumnMetaStub columnStub) { - return new KMetaColumn(columnStub.getTABLE_CAT(), columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(), - columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), columnStub.getTYPE_NAME(), - columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(), - columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), columnStub.getORDINAL_POSITION(), - columnStub.getIS_NULLABLE()); + return new KMetaColumn(columnStub.getTABLE_CAT(), columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(), columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), columnStub.getTYPE_NAME(), columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(), columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), columnStub.getORDINAL_POSITION(), columnStub.getIS_NULLABLE()); } @Override - public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, - Map<String, String> queryToggles) throws IOException { + public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, Map<String, String> queryToggles) throws IOException { SQLResponseStub queryResp = executeKylinQuery(sql, convertParameters(params, paramValues), queryToggles); if (queryResp.getIsException()) @@ -354,8 +346,7 @@ public class KylinClient implements IRemoteClient { return result; } - private SQLResponseStub executeKylinQuery(String sql, List<StatementParameter> params, - Map<String, String> queryToggles) throws IOException { + private SQLResponseStub executeKylinQuery(String sql, List<StatementParameter> params, Map<String, String> queryToggles) throws IOException { String url = baseUrl() + "/kylin/api/query"; String project = conn.getProject(); @@ -397,11 +388,7 @@ public class KylinClient implements IRemoteClient { Class columnClass = convertType(scm.getColumnType()); ScalarType type = ColumnMetaData.scalar(scm.getColumnType(), scm.getColumnTypeName(), Rep.of(columnClass)); - ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), scm.isCaseSensitive(), - scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(), - scm.getLabel(), scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(), - scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), scm.isWritable(), scm.isWritable(), - columnClass.getCanonicalName()); + ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), scm.isCaseSensitive(), scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(), scm.getLabel(), scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(), scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), scm.isWritable(), scm.isWritable(), columnClass.getCanonicalName()); metas.add(meta); } @@ -426,8 +413,7 @@ public class KylinClient implements IRemoteClient { } private IOException asIOException(HttpRequestBase request, HttpResponse response) throws IOException { - return new IOException(request.getMethod() + " failed, error code " + response.getStatusLine().getStatusCode() - + " and response: " + EntityUtils.toString(response.getEntity())); + return new IOException(request.getMethod() + " failed, error code " + response.getStatusLine().getStatusCode() + " and response: " + EntityUtils.toString(response.getEntity())); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java index 7fd09d6..6852998 100644 --- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java +++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java @@ -47,8 +47,7 @@ public class KylinConnection extends AvaticaConnection { private final String project; private final IRemoteClient remoteClient; - protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory factory, String url, Properties info) - throws SQLException { + protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory factory, String url, Properties info) throws SQLException { super(driver, factory, url, info); String odbcUrl = url; @@ -84,8 +83,7 @@ public class KylinConnection extends AvaticaConnection { } @Override - public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) - throws SQLException { + public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); } @@ -102,11 +100,9 @@ public class KylinConnection extends AvaticaConnection { } @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, - int resultSetHoldability) throws SQLException { + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { Meta.Signature sig = mockPreparedSignature(sql); - return factory().newPreparedStatement(this, null, sig, resultSetType, resultSetConcurrency, - resultSetHoldability); + return factory().newPreparedStatement(this, null, sig, resultSetType, resultSetConcurrency, resultSetHoldability); } // TODO add restful API to prepare SQL, get back expected ResultSetMetaData http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java index 32bf6ca..6aae983 100644 --- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java +++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java @@ -73,8 +73,7 @@ public class KylinJdbcFactory implements AvaticaFactory { } @Override - public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url, - Properties info) throws SQLException { + public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) throws SQLException { return new KylinConnection(driver, (KylinJdbcFactory) factory, url, info); } @@ -85,23 +84,17 @@ public class KylinJdbcFactory implements AvaticaFactory { } @Override - public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType, - int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency, - resultSetHoldability); + public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override - public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, StatementHandle h, - Signature signature, int resultSetType, int resultSetConcurrency, int resultSetHoldability) - throws SQLException { - return new KylinPreparedStatement((KylinConnection) connection, h, signature, resultSetType, - resultSetConcurrency, resultSetHoldability); + public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, StatementHandle h, Signature signature, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return new KylinPreparedStatement((KylinConnection) connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override - public AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Signature signature, - TimeZone timeZone, Frame firstFrame) throws SQLException { + public AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Signature signature, TimeZone timeZone, Frame firstFrame) throws SQLException { AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(statement, null, signature); return new KylinResultSet(statement, state, signature, resultSetMetaData, timeZone, firstFrame); }