shaofengshi closed pull request #204: KYLIN-3442 Fact distinct columns in Spark
URL: https://github.com/apache/kylin/pull/204
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 700aeb5cb0..ceb93373a2 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -100,6 +100,47 @@
             <version>0.9.10</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
new file mode 100644
index 0000000000..cb5458da77
--- /dev/null
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{DataInputBuffer, Writable}
+import org.apache.hadoop.mapred.RawKeyValueIterator
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.counters.GenericCounter
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
+import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, 
TaskAttemptContextImpl}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, TaskAttemptID, 
TaskType}
+import org.apache.hadoop.util.Progress
+import org.apache.spark._
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class MultipleOutputsRDD[K, V](self: RDD[(String, (K, V, String))])
+                              (implicit kt: ClassTag[K], vt: ClassTag[V]) 
extends Serializable {
+
+  def saveAsNewAPIHadoopDatasetWithMultipleOutputs(conf: Configuration) {
+    val hadoopConf = conf
+    val job = NewAPIHadoopJob.getInstance(hadoopConf)
+    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    val jobConfiguration = job.getConfiguration
+    val wrappedConf = new SerializableWritable(jobConfiguration)
+    val outfmt = job.getOutputFormatClass
+    val jobFormat = outfmt.newInstance
+
+    if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+      jobFormat.checkOutputSpecs(job)
+    }
+
+    val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V, 
String))]) => {
+      val config = wrappedConf.value
+
+      val attemptId = new TaskAttemptID(jobtrackerID, stageId, 
TaskType.REDUCE, context.partitionId,
+        context.attemptNumber)
+      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
+      val format = outfmt.newInstance
+
+      format match {
+        case c: Configurable => c.setConf(wrappedConf.value)
+        case _ => ()
+      }
+
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+
+      val recordWriter = 
format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
+
+      val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, 
attemptId, new InputIterator(itr), new GenericCounter, new GenericCounter,
+        recordWriter, committer, new DummyReporter, null, kt.runtimeClass, 
vt.runtimeClass)
+
+      // use hadoop MultipleOutputs
+      val writer = new MultipleOutputs(taskInputOutputContext)
+
+      try {
+        while (itr.hasNext) {
+          val pair = itr.next()
+          writer.write(pair._1, pair._2._1, pair._2._2, pair._2._3)
+        }
+      } finally {
+        writer.close()
+      }
+      committer.commitTask(hadoopContext)
+      1
+    }: Int
+
+    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 
0, 0)
+    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, 
jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    self.context.runJob(self, writeShard)
+    jobCommitter.commitJob(jobTaskContext)
+  }
+
+  class InputIterator(itr: Iterator[_]) extends RawKeyValueIterator {
+    def getKey: DataInputBuffer = null
+    def getValue: DataInputBuffer = null
+    def getProgress: Progress = null
+    def next = itr.hasNext
+    def close() { }
+  }
+}
+
+object MultipleOutputsRDD {
+  def rddToMultipleOutputsRDD[K, V](rdd: JavaPairRDD[String, (Writable, 
Writable, String)]) = {
+    new MultipleOutputsRDD(rdd)
+  }
+}
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e5451663e5..5fd7213b2b 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
@@ -32,9 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -61,7 +61,7 @@ public CubingJob build() {
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
+        result.addTask(createFactDistinctColumnsSparkStep(jobId));
 
         if (isEnableUHCDictStep()) {
             result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +87,32 @@ public CubingJob build() {
         return result;
     }
 
+    public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, 
getJobWorkingDir(jobId));
+
+        sparkExecutable.setClassName(SparkFactDistinct.class.getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), jobId));
+        
sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_TABLE.getOpt(), 
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_PATH.getOpt(), 
tablePath);
+        
sparkExecutable.setParam(SparkFactDistinct.OPTION_OUTPUT_PATH.getOpt(), 
getFactDistinctColumnsPath(jobId));
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
+        
sparkExecutable.setParam(SparkFactDistinct.OPTION_STATS_SAMPLING_PERCENT.getOpt(),
 String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+        sparkExecutable.setJobId(jobId);
+        
sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + 
CubingJob.SOURCE_SIZE_BYTES);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, 
seg.getConfig().getSparkAdditionalJars());
+
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
 
     protected void addLayerCubingSteps(final CubingJob result, final String 
jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cb3af317d9..9f4ae3482a 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -27,13 +27,10 @@
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeDescManager;
@@ -68,9 +65,6 @@
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,37 +163,8 @@ protected void execute(OptionsHelper optionsHelper) throws 
Exception {
 
         boolean isSequenceFile = 
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
-
-        if (isSequenceFile) {
-            encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
-                            return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, 
metaUrl, sConf));
-        } else {
-            SparkSession sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, 
String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, 
sConf));
-
-        }
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, 
sConf));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 637382cba9..612239741f 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -375,7 +375,11 @@ private void attachSegmentMetadataWithDict(CubeSegment 
segment) throws IOExcepti
         Set<String> dumpList = new LinkedHashSet<>();
         
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
         dumpList.addAll(segment.getDictionaryPaths());
-        dumpList.add(segment.getStatisticsResourcePath());
+        ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+        if (rs.exists(segment.getStatisticsResourcePath())) {
+            // cube statistics is not available for new segment
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
         JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, 
(KylinConfigExt) segment.getConfig(), 
this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
     }
 
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
new file mode 100644
index 0000000000..61e2e534e8
--- /dev/null
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -0,0 +1,866 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import 
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkFactDistinct extends AbstractApplication implements 
Serializable {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(SparkFactDistinct.class);
+
+    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output 
path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+            
.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+            .withDescription("Statistics sampling 
percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table 
PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public SparkFactDistinct() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        int samplingPercent = 
Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+
+        Class[] kryoClassArray = new Class[] { 
Class.forName("scala.reflect.ClassTag$$anon$1"), 
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
+
+        SparkConf conf = new SparkConf().setAppName("Fact distinct columns 
for:" + cubeName + " segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+        KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+
+        final Job job = Job.getInstance(sConf.get());
+
+        final FactDistinctColumnsReducerMapping reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+
+        logger.info("RDD Output path: {}", outputPath);
+        logger.info("getTotalReducerNum: {}", 
reducerMapping.getTotalReducerNum());
+        logger.info("getCuboidRowCounterReducerNum: {}", 
reducerMapping.getCuboidRowCounterReducerNum());
+
+        boolean isSequenceFile = 
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        // calculate source record bytes size
+        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+
+        final JavaRDD<String[]> recordRDD = 
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+
+        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = 
recordRDD.mapPartitionsToPair(new FlatOutputFucntion(cubeName, segmentId, 
metaUrl, sConf, samplingPercent, bytesWritten));
+
+        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = 
flatOutputRDD.groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, 
reducerMapping.getTotalReducerNum()));
+
+        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = 
aggredRDD.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, 
samplingPercent));
+
+        // make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, 
SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, 
SequenceFileOutputFormat.class, NullWritable.class, 
ArrayPrimitiveWritable.class);
+        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, 
LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, 
NullWritable.class, LongWritable.class);
+
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+        // prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+
+        MultipleOutputsRDD multipleOutputsRDD = 
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
+        
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+
+        // only work for client mode, not work when 
spark.submit.deployMode=cluster
+        logger.info("Map input records={}", recordRDD.count());
+        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+
+        HadoopUtil.deleteHDFSMeta(metaUrl);
+    }
+
+    static class FlatOutputFucntion implements 
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private CuboidStatCalculator cuboidStatCalculator;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private List<TblColRef> allCols;
+        private int[] columnIndex;
+        private DictColDeduper dictColDeduper;
+        private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap;
+        private ByteBuffer tmpbuf;
+        private LongAccumulator bytesWritten;
+
+        public FlatOutputFucntion(String cubeName, String segmentId, String 
metaurl, SerializableConfiguration conf, int samplingPercent, LongAccumulator 
bytesWritten) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+            this.dimensionRangeInfoMap = Maps.newHashMap();
+            this.bytesWritten = bytesWritten;
+        }
+
+        private void init() {
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
+
+            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+            tmpbuf = ByteBuffer.allocate(4096);
+
+            int[] rokeyColumnIndexes = 
intermediateTableDesc.getRowKeyColumnIndexes();
+
+            Long[] cuboidIds = getCuboidIds(cubeSegment);
+
+            Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, 
rokeyColumnIndexes.length);
+
+            boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+
+            HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length, 
cubeDesc.getConfig().getCubeStatsHLLPrecision());
+
+            cuboidStatCalculator = new 
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet, 
isNewAlgorithm, cuboidsHLL);
+            allCols = reducerMapping.getAllDimDictCols();
+
+            initDictColDeduper(cubeDesc);
+            initColumnIndex(intermediateTableDesc);
+
+            initialized = true;
+        }
+
+        @Override
+        public Iterator<Tuple2<SelfDefineSortableKey, Text>> 
call(Iterator<String[]> rowIterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<String[]> rows = Lists.newArrayList(rowIterator);
+            List<Tuple2<SelfDefineSortableKey, Text>> result = 
Lists.newArrayList();
+
+            int rowCount = 0;
+
+            for (String[] row : rows) {
+                bytesWritten.add(countSizeInBytes(row));
+
+                for (int i = 0; i < allCols.size(); i++) {
+                    String fieldValue = row[columnIndex[i]];
+                    if (fieldValue == null)
+                        continue;
+
+                    final DataType type = allCols.get(i).getType();
+
+                    //for dic column, de dup before write value; for dim not 
dic column, hold util doCleanup()
+                    if (dictColDeduper.isDictCol(i)) {
+                        if (dictColDeduper.add(i, fieldValue)) {
+                            addFieldValue(type, i, fieldValue, result);
+                        }
+                    } else {
+                        DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                        if (old == null) {
+                            old = new DimensionRangeInfo(fieldValue, 
fieldValue);
+                            dimensionRangeInfoMap.put(i, old);
+                        } else {
+                            old.setMax(type.getOrder().max(old.getMax(), 
fieldValue));
+                            old.setMin(type.getOrder().min(old.getMin(), 
fieldValue));
+                        }
+                    }
+                }
+
+                if (rowCount % 100 < samplingPercent) {
+                    cuboidStatCalculator.putRow(row);
+                }
+
+                if (rowCount % 100 == 0) {
+                    dictColDeduper.resetIfShortOfMem();
+                }
+
+                rowCount++;
+            }
+
+            ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+            HLLCounter hll;
+
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = cuboidsHLL[i];
+                tmpbuf.clear();
+                tmpbuf.put((byte) 
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                Text outputKey = new Text();
+                Text outputValue = new Text();
+                SelfDefineSortableKey sortableKey = new 
SelfDefineSortableKey();
+
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+
+                sortableKey.init(outputKey, (byte) 0);
+
+                result.add(new Tuple2<SelfDefineSortableKey, 
Text>(sortableKey, outputValue));
+            }
+
+            for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+                DimensionRangeInfo rangeInfo = 
dimensionRangeInfoMap.get(colIndex);
+                DataType dataType = allCols.get(colIndex).getType();
+                addFieldValue(dataType, colIndex, rangeInfo.getMin(), result);
+                addFieldValue(dataType, colIndex, rangeInfo.getMax(), result);
+            }
+
+            return result.iterator();
+        }
+
+        private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
+            boolean isUsePutRowKeyToHllNewAlgorithm;
+            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                isUsePutRowKeyToHllNewAlgorithm = false;
+                logger.info("Found KylinVersion: {}. Use old algorithm for 
cuboid sampling.", cubeDesc.getVersion());
+            } else {
+                isUsePutRowKeyToHllNewAlgorithm = true;
+                logger.info(
+                        "Found KylinVersion: {}. Use new algorithm for cuboid 
sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
+            }
+            return isUsePutRowKeyToHllNewAlgorithm;
+        }
+
+        private Long[] getCuboidIds(CubeSegment cubeSegment) {
+            Set<Long> cuboidIdSet = 
Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+            if 
(StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
+                // For cube planner, for every prebuilt cuboid, its related 
row count stats should be calculated
+                // If the precondition for trigger cube planner phase one is 
satisfied, we need to calculate row count stats for mandatory cuboids.
+                
cuboidIdSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
+            }
+
+            return cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        }
+
+        private HLLCounter[] getInitCuboidsHLL(int cuboidSize, int 
hllPrecision) {
+            HLLCounter[] cuboidsHLL = new HLLCounter[cuboidSize];
+            for (int i = 0; i < cuboidSize; i++) {
+                cuboidsHLL[i] = new HLLCounter(hllPrecision, 
RegisterType.DENSE);
+            }
+            return cuboidsHLL;
+        }
+
+        private void initDictColDeduper(CubeDesc cubeDesc) {
+            // setup dict col deduper
+            dictColDeduper = new DictColDeduper();
+            Set<TblColRef> dictCols = 
cubeDesc.getAllColumnsNeedDictionaryBuilt();
+            for (int i = 0; i < allCols.size(); i++) {
+                if (dictCols.contains(allCols.get(i)))
+                    dictColDeduper.setIsDictCol(i);
+            }
+        }
+
+        private void initColumnIndex(CubeJoinedFlatTableEnrich 
intermediateTableDesc) {
+            columnIndex = new int[allCols.size()];
+            for (int i = 0; i < allCols.size(); i++) {
+                TblColRef colRef = allCols.get(i);
+                int columnIndexOnFlatTbl = 
intermediateTableDesc.getColumnIndex(colRef);
+                columnIndex[i] = columnIndexOnFlatTbl;
+            }
+        }
+
+        private void addFieldValue(DataType type, Integer colIndex, String 
value,
+                List<Tuple2<SelfDefineSortableKey, Text>> result) {
+            int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, 
value);
+            tmpbuf.clear();
+            byte[] valueBytes = Bytes.toBytes(value);
+            int size = valueBytes.length + 1;
+            if (size >= tmpbuf.capacity()) {
+                tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), 
size));
+            }
+            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+            tmpbuf.put(valueBytes);
+
+            Text outputKey = new Text();
+            SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+            sortableKey.init(outputKey, type);
+
+            result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, 
new Text()));
+
+            // log a few rows for troubleshooting
+            if (result.size() < 10) {
+                logger.info("Sample output: " + allCols.get(colIndex) + " '" + 
value + "' => reducer " + reducerIndex);
+            }
+        }
+
+        private int countNewSize(int oldSize, int dataSize) {
+            int newSize = oldSize * 2;
+            while (newSize < dataSize) {
+                newSize = newSize * 2;
+            }
+            return newSize;
+        }
+
+        private int countSizeInBytes(String[] row) {
+            int size = 0;
+            for (String s : row) {
+                size += s == null ? 1 : StringUtil.utf8Length(s);
+                size++; // delimiter
+            }
+            return size;
+        }
+    }
+
+    static class CuboidStatCalculator {
+        private final int nRowKey;
+        private final int[] rowkeyColIndex;
+        private final Long[] cuboidIds;
+        private final Integer[][] cuboidsBitSet;
+        private volatile HLLCounter[] cuboidsHLL;
+
+        //about details of the new algorithm, please see KYLIN-2518
+        private final boolean isNewAlgorithm;
+        private final HashFunction hf;
+        private long[] rowHashCodesLong;
+
+        public CuboidStatCalculator(int[] rowkeyColIndex, Long[] cuboidIds, 
Integer[][] cuboidsBitSet,
+                boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] 
cuboidsHLL) {
+            this.nRowKey = rowkeyColIndex.length;
+            this.rowkeyColIndex = rowkeyColIndex;
+            this.cuboidIds = cuboidIds;
+            this.cuboidsBitSet = cuboidsBitSet;
+            this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+            if (!isNewAlgorithm) {
+                this.hf = Hashing.murmur3_32();
+            } else {
+                rowHashCodesLong = new long[nRowKey];
+                this.hf = Hashing.murmur3_128();
+            }
+            this.cuboidsHLL = cuboidsHLL;
+        }
+
+        public void putRow(final String[] row) {
+            String[] copyRow = Arrays.copyOf(row, row.length);
+
+            if (isNewAlgorithm) {
+                putRowKeyToHLLNew(copyRow);
+            } else {
+                putRowKeyToHLLOld(copyRow);
+            }
+        }
+
+        private void putRowKeyToHLLOld(String[] row) {
+            //generate hash for each row key column
+            byte[][] rowHashCodes = new byte[nRowKey][];
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue != null) {
+                    rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                } else {
+                    rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
+
+            // user the row key column hash to get a consolidated hash for 
each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                Hasher hc = hf.newHasher();
+                for (int position = 0; position < cuboidsBitSet[i].length; 
position++) {
+                    hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                }
+
+                cuboidsHLL[i].add(hc.hash().asBytes());
+            }
+        }
+
+        private void putRowKeyToHLLNew(String[] row) {
+            //generate hash for each row key column
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue == null)
+                    colValue = "0";
+                byte[] bytes = hc.putString(colValue).hash().asBytes();
+                rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column 
ordinal to the hash value to distinguish between (a,b) and (b,a)
+            }
+
+            // user the row key column hash to get a consolidated hash for 
each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                long value = 0;
+                for (int position = 0; position < cuboidsBitSet[i].length; 
position++) {
+                    value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                }
+                cuboidsHLL[i].addHashDirectly(value);
+            }
+        }
+
+        public HLLCounter[] getHLLCounters() {
+            return cuboidsHLL;
+        }
+
+        public Long[] getCuboidIds() {
+            return cuboidIds;
+        }
+    }
+
+    static class FactDistinctPartitioner extends Partitioner {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int totalReducerNum;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+
+        public FactDistinctPartitioner(String cubeName, String metaUrl, 
SerializableConfiguration conf, int totalReducerNum) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.totalReducerNum = totalReducerNum;
+        }
+
+        private void init() {
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+
+            initialized = true;
+        }
+
+        @Override
+        public int numPartitions() {
+            return totalReducerNum;
+        }
+
+        @Override
+        public int getPartition(Object o) {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            SelfDefineSortableKey skey = (SelfDefineSortableKey) o;
+            Text key = skey.getText();
+            if (key.getBytes()[0] == 
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+                Long cuboidId = Bytes.toLong(key.getBytes(), 1, 
Bytes.SIZEOF_LONG);
+                return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+            } else {
+                return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+            }
+        }
+    }
+
+    static class MultiOutputFunction implements
+            PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey, 
Iterable<Text>>>, String, Tuple3<Writable, Writable, String>> {
+        private volatile transient boolean initialized = false;
+        private String DICT_FILE_POSTFIX = ".rldict";
+        private String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private int taskId;
+        private boolean isStatistics = false;
+        private long baseCuboidId;
+        private List<Long> baseCuboidRowCountInMappers;
+        private Map<Long, HLLCounter> cuboidHLLMap;
+        private TblColRef col;
+        private boolean buildDictInReducer;
+        private IDictionaryBuilder builder;
+        private int rowCount = 0;
+        private long totalRowsBeforeMerge = 0;
+        private KylinConfig cubeConfig;
+        private CubeDesc cubeDesc;
+        private String maxValue = null;
+        private String minValue = null;
+        private List<Tuple2<String, Tuple3<Writable, Writable, String>>> 
result;
+
+        public MultiOutputFunction(String cubeName, String metaurl, 
SerializableConfiguration conf, int samplingPercent) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+        }
+
+        private void init() throws IOException {
+            taskId = TaskContext.getPartitionId();
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+            cubeDesc = cubeInstance.getDescriptor();
+            cubeConfig = cubeInstance.getConfig();
+            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+
+            result = Lists.newArrayList();
+
+            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = 
cubeInstance.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+
+                logger.info("Partition " + taskId + " handling stats");
+            } else {
+                // normal col
+                col = reducerMapping.getColForReducer(taskId);
+                Preconditions.checkNotNull(col);
+
+                // local build dict
+                buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only 
works with default dictionary builder
+                    buildDictInReducer = false;
+                }
+
+                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                    buildDictInReducer = false; // only works if this is the 
only reducer of a dictionary column
+                }
+
+                if (buildDictInReducer) {
+                    builder = 
DictionaryGenerator.newDictionaryBuilder(col.getType());
+                    builder.init(null, 0, null);
+                }
+                logger.info("Partition " + taskId + " handling column " + col 
+ ", buildDictInReducer=" + buildDictInReducer);
+            }
+
+            initialized = true;
+        }
+
+        private void logAFewRows(String value) {
+            if (rowCount < 10) {
+                logger.info("Received value: " + value);
+            }
+        }
+
+        @Override
+        public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> 
call(
+                Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>> 
tuple2Iterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuples = 
Lists.newArrayList(tuple2Iterator);
+
+            for (Tuple2<SelfDefineSortableKey, Iterable<Text>> tuple : tuples) 
{
+                Text key = tuple._1.getText();
+
+                if (isStatistics) {
+                    // for hll
+                    long cuboidId = Bytes.toLong(key.getBytes(), 1, 
Bytes.SIZEOF_LONG);
+
+                    for (Text value : tuple._2) {
+                        HLLCounter hll = new 
HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+                        ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength());
+                        hll.readRegisters(bf);
+
+                        totalRowsBeforeMerge += hll.getCountEstimate();
+
+                        if (cuboidId == baseCuboidId) {
+                            
baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                        }
+
+                        if (cuboidHLLMap.get(cuboidId) != null) {
+                            cuboidHLLMap.get(cuboidId).merge(hll);
+                        } else {
+                            cuboidHLLMap.put(cuboidId, hll);
+                        }
+                    }
+
+                } else {
+                    String value = Bytes.toString(key.getBytes(), 1, 
key.getLength() - 1);
+                    logAFewRows(value);
+                    // if dimension col, compute max/min value
+                    if 
(cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                        if (minValue == null || 
col.getType().compare(minValue, value) > 0) {
+                            minValue = value;
+                        }
+                        if (maxValue == null || 
col.getType().compare(maxValue, value) < 0) {
+                            maxValue = value;
+                        }
+                    }
+
+                    //if dict column
+                    if 
(cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                        if (buildDictInReducer) {
+                            builder.addValue(value);
+                        } else {
+                            byte[] keyBytes = Bytes.copy(key.getBytes(), 1, 
key.getLength() - 1);
+                            // output written to baseDir/colName/-r-00000 (etc)
+                            String fileName = col.getIdentity() + "/";
+                            result.add(new Tuple2<String, Tuple3<Writable, 
Writable, String>>(
+                                    BatchConstants.CFG_OUTPUT_COLUMN, new 
Tuple3<Writable, Writable, String>(
+                                    NullWritable.get(), new Text(keyBytes), 
fileName)));
+                        }
+                    }
+                }
+
+                rowCount++;
+            }
+
+            if (isStatistics) {
+                //output the hll info;
+                List<Long> allCuboids = Lists.newArrayList();
+                allCuboids.addAll(cuboidHLLMap.keySet());
+                Collections.sort(allCuboids);
+
+                logMapperAndCuboidStatistics(allCuboids); // for human check
+                outputStatistics(allCuboids, result);
+            } else {
+                //dimension col
+                if 
(cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                    outputDimRangeInfo(result);
+                }
+                // dic col
+                if (buildDictInReducer) {
+                    Dictionary<String> dict = builder.build();
+                    outputDict(col, dict, result);
+                }
+            }
+
+            return result.iterator();
+        }
+
+        private void logMapperAndCuboidStatistics(List<Long> allCuboids) 
throws IOException {
+            logger.info("Cuboid number for task: " + taskId + "\t" + 
allCuboids.size());
+            logger.info("Samping percentage: \t" + samplingPercent);
+            logger.info("The following statistics are collected based on 
sampling data. ");
+            logger.info("Number of Mappers: " + 
baseCuboidRowCountInMappers.size());
+
+            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+                if (baseCuboidRowCountInMappers.get(i) > 0) {
+                    logger.info("Base Cuboid in Mapper " + i + " row count: \t 
" + baseCuboidRowCountInMappers.get(i));
+                }
+            }
+
+            long grantTotal = 0;
+            for (long i : allCuboids) {
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                logger.info("Cuboid " + i + " row count is: \t " + 
cuboidHLLMap.get(i).getCountEstimate());
+            }
+
+            logger.info("Sum of row counts (before merge) is: \t " + 
totalRowsBeforeMerge);
+            logger.info("After merge, the row count: \t " + grantTotal);
+        }
+
+        private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable, 
Writable, String>>> result) {
+            if (col != null && minValue != null) {
+                // output written to baseDir/colName/colName.dci-r-00000 (etc)
+                String dimRangeFileName = col.getIdentity() + "/" + 
col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, 
String>(NullWritable.get(), new Text(minValue.getBytes()),
+                                dimRangeFileName)));
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, 
String>(NullWritable.get(), new Text(maxValue.getBytes()),
+                                dimRangeFileName)));
+                logger.info("write dimension range info for col : " + 
col.getName() + "  minValue:" + minValue
+                        + " maxValue:" + maxValue);
+            }
+        }
+
+        private void outputDict(TblColRef col, Dictionary<String> dict, 
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+            String dictFileName = col.getIdentity() + "/" + col.getName() + 
DICT_FILE_POSTFIX;
+
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DataOutputStream outputStream = new DataOutputStream(baos)) {
+                outputStream.writeUTF(dict.getClass().getName());
+                dict.write(outputStream);
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_DICT,
+                        new Tuple3<Writable, Writable, 
String>(NullWritable.get(),
+                                new 
ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
+            }
+        }
+
+        private void outputStatistics(List<Long> allCuboids, 
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/statistics/statistics-r-00000 (etc)
+            String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + 
"/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+            // mapper overlap ratio at key -1
+            long grandTotal = 0;
+
+            for (HLLCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+
+            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new 
LongWritable(-1),
+                            new 
BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName)));
+
+            // mapper number at key -2
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new 
LongWritable(-2),
+                            new 
BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), 
statisticsFileName)));
+
+            // sampling percentage at key 0
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new 
LongWritable(0L),
+                            new BytesWritable(Bytes.toBytes(samplingPercent)), 
statisticsFileName)));
+
+            ByteBuffer valueBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            for (long i : allCuboids) {
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+
+                byte[] valueCopy = new byte[valueBuf.limit()];
+                System.arraycopy(valueBuf.array(), 0, valueCopy, 0, 
valueBuf.limit());
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, 
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                        new Tuple3<Writable, Writable, String>(new 
LongWritable(i),
+                                new BytesWritable(valueCopy, 
valueCopy.length), statisticsFileName)));
+            }
+        }
+    }
+}
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 31eebc83c3..82a1a9be06 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,23 +24,31 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 
 public class SparkUtil {
 
@@ -130,6 +138,41 @@ public static void 
modifySparkHadoopConfiguration(SparkContext sc) throws Except
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", 
"true");
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", 
"BLOCK");
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
 "org.apache.hadoop.io.compress.DefaultCodec"); // or 
org.apache.hadoop.io.compress.SnappyCodec
-  }
+    }
+
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, 
JavaSparkContext sc, String inputPath, String hiveTable) {
+        JavaRDD<String[]> recordRDD;
+
+        if (isSequenceFile) {
+            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
+                    .map(new Function<Text, String[]>() {
+                        @Override
+                        public String[] call(Text text) throws Exception {
+                            String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
+                            return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                        }
+                    });
+        } else {
+            SparkSession sparkSession = 
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+            final Dataset intermediateTable = sparkSession.table(hiveTable);
+            recordRDD = intermediateTable.javaRDD().map(new Function<Row, 
String[]>() {
+                @Override
+                public String[] call(Row row) throws Exception {
+                    String[] result = new String[row.size()];
+                    for (int i = 0; i < row.size(); i++) {
+                        final Object o = row.get(i);
+                        if (o != null) {
+                            result[i] = o.toString();
+                        } else {
+                            result[i] = null;
+                        }
+                    }
+                    return result;
+                }
+            });
+        }
+
+        return recordRDD;
+    }
 
 }
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 16bedb50e8..a3e7e68517 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -307,17 +307,14 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-compiler</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-reflect</artifactId>
-            <version>2.11.0</version>
         </dependency>
     </dependencies>
 
diff --git a/pom.xml b/pom.xml
index d9b9efe20b..cd186599cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- Scala versions -->
+        <scala.version>2.11.0</scala.version>
+
         <!-- <reflections.version>0.9.10</reflections.version> -->
 
         <!-- Calcite Version -->
@@ -895,6 +898,24 @@
                 <version>${tomcat.version}</version>
                 <scope>provided</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-compiler</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-reflect</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to