Repository: carbondata
Updated Branches:
  refs/heads/master 8002c5973 -> 45951c763


[CARBONDATA-3219] Support range partition the input data for local_sort/global 
sort data loading

For global_sort/local_sort table, load data command add RANGE_COLUMN option

load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>')
when we know the total size of input data, we can calculate the number of the 
partitions.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'global_sort_partitions'='10')
when we don't know the total size of the input data, we can give the size of 
each partition.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'scale_factor'='10')
it will calcute the number of the partitions as follows.

splitSize =  Math.max(blocklet_size, (block_size - blocklet_size)) * 
scale_factor
numPartitions = Math.ceil(total size / splitSize)
Limitation:

not support insert into, support only load data command,
not support multiple range columns, support only one range column
exists data skew

This closes #2971


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/45951c76
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/45951c76
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/45951c76

Branch: refs/heads/master
Commit: 45951c7630372602b8323a5bc64d695fdef979ba
Parents: 8002c59
Author: QiangCai <qiang...@qq.com>
Authored: Fri Jan 4 16:40:25 2019 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jan 8 12:54:41 2019 +0530

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |   7 +
 .../test/resources/range_column/dataskew.csv    |  11 +
 .../dataload/TestRangeColumnDataLoad.scala      | 126 +++++++
 .../carbondata/spark/load/CsvRDDHelper.scala    |   3 +
 .../load/DataLoadProcessBuilderOnSpark.scala    | 210 ++++++++++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 183 +++++++++-
 .../apache/spark/DataSkewRangePartitioner.scala | 365 +++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   6 +
 .../loading/csvinput/CSVInputFormat.java        |  10 +
 .../loading/model/CarbonLoadModel.java          |  41 +++
 .../loading/model/CarbonLoadModelBuilder.java   |  29 ++
 .../processing/loading/model/LoadOption.java    |   3 +
 .../parser/impl/RangeColumnParserImpl.java      |  56 +++
 14 files changed, 1040 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 46be8d8..5cf6163 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -165,4 +165,11 @@ public final class CarbonLoadOptionConstants {
       = "carbon.load.sortmemory.spill.percentage";
 
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT 
= "0";
+
+  /**
+   * For Range_Column, it will use SCALE_FACTOR to control the size of each 
partition.
+   * When SCALE_FACTOR is about the compression ratio, each task will generate 
one CarbonData file.
+   * And the size of the file is about TABLE_BLOCKSIZE of this table.
+   */
+  public static final int CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = 3;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv 
b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
new file mode 100644
index 0000000..fb77a5d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
@@ -0,0 +1,11 @@
+id,name,city,age
+1,,wuhan,91
+2,,hangzhou,102
+3,,beijing,112
+4,,shenzhen,124
+5,e,shenzhen,65
+6,f,beijing,76
+7,g,hangzhou,37
+8,h,wuhan,48
+9,i,,89
+10,j,,50
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
new file mode 100644
index 0000000..14f11e5
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with 
BeforeAndAfterAll {
+  var filePath: String = s"$resourcesPath/globalsort"
+
+  override def beforeAll(): Unit = {
+    dropTable
+  }
+
+  override def afterAll(): Unit = {
+    dropTable
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql("DROP TABLE IF EXISTS carbon_range_column2")
+    sql("DROP TABLE IF EXISTS carbon_range_column3")
+    sql("DROP TABLE IF EXISTS carbon_range_column4")
+  }
+
+  test("range_column with option GLOBAL_SORT_PARTITIONS") {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " 
+
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='1', 'range_column'='name')")
+
+    assert(getIndexFileCount("carbon_range_column1") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column1"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_range_column1"),
+      sql("SELECT * FROM carbon_range_column1 ORDER BY name"))
+  }
+
+  test("range_column with option scale_factor") {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column2(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column2 " 
+
+        "OPTIONS('scale_factor'='10', 'range_column'='name')")
+
+    assert(getIndexFileCount("carbon_range_column2") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column2"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_range_column2"),
+      sql("SELECT * FROM carbon_range_column2 ORDER BY name"))
+  }
+
+  test("range_column only support single column ") {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column3(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+      """.stripMargin)
+
+    intercept[InvalidLoadOptionException] {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column3 
" +
+          "OPTIONS('scale_factor'='10', 'range_column'='name,id')")
+    }
+  }
+
+  test("range_column with data skew") {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column4(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='name, 
city')
+      """.stripMargin)
+
+    val dataSkewPath = s"$resourcesPath/range_column"
+
+    sql(s"LOAD DATA LOCAL INPATH '$dataSkewPath' INTO TABLE 
carbon_range_column4 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='5', 'range_column'='name', " +
+        "'BAD_RECORDS_ACTION'='force')")
+
+    assert(getIndexFileCount("carbon_range_column4") === 5)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(10)))
+  }
+
+  private def getIndexFileCount(tableName: String, segmentNo: String = "0"): 
Int = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
tableName)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName).getIndexCarbonFiles
+        .size()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 5511645..f629260 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -67,8 +67,10 @@ object CsvRDDHelper {
     val jobContext = new JobContextImpl(jobConf, null)
     val inputFormat = new CSVInputFormat()
     val rawSplits = inputFormat.getSplits(jobContext).toArray
+    var totalLength = 0L
     val splitFiles = rawSplits.map { split =>
       val fileSplit = split.asInstanceOf[FileSplit]
+      totalLength = totalLength + fileSplit.getLength
       PartitionedFile(
         InternalRow.empty,
         fileSplit.getPath.toString,
@@ -76,6 +78,7 @@ object CsvRDDHelper {
         fileSplit.getLength,
         fileSplit.getLocations)
     }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    model.setTotalSize(totalLength)
     val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
     val bytesPerCore = totalBytes / defaultParallelism
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8ded6bd..ec1153a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -20,19 +20,24 @@ package org.apache.carbondata.spark.load
 import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.TaskContext
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, 
RangePartitioner, TaskContext}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, 
FailureCauses}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
+import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, 
DataField, DataLoadProcessBuilder, FailureCauses}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
@@ -139,6 +144,11 @@ object DataLoadProcessBuilderOnSpark {
     LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
     LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
 
+    updateLoadStatus(model, partialSuccessAccum)
+  }
+
+  private def updateLoadStatus(model: CarbonLoadModel, partialSuccessAccum: 
Accumulator[Int]
+  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     // Update status
     if (partialSuccessAccum.value != 0) {
       val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
@@ -156,4 +166,194 @@ object DataLoadProcessBuilderOnSpark {
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+      sparkSession: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+    // initialize and prepare row counter
+    val sc = sparkSession.sparkContext
+    val modelBroadcast = sc.broadcast(model)
+    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
+    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
+    val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
+    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
+
+    // 1. Input
+    hadoopConf
+      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+    val inputRDD = CsvRDDHelper
+      .csvFileScanRDD(sparkSession, model, hadoopConf)
+      .mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark.internalInputFunc(
+          rows, index, modelBroadcast, Option(inputStepRowCounter), 
Option.empty)
+      }
+
+    // 2. Convert
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+    val convertRDD = inputRDD
+      .mapPartitionsWithIndex { case (index, rows) =>
+        
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+        DataLoadProcessorStepOnSpark
+          .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+      }
+      .filter(_ != null)
+
+    // 3. Range partition by range_column
+    val configuration = DataLoadProcessBuilder.createConfiguration(model)
+    val rangeColumnIndex =
+      indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields)
+    // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+    val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+    // range partition by key
+    val numPartitions = getNumPartitions(configuration, model, convertRDD)
+    val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+    import scala.reflect.classTag
+    val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+    val rangeRDD = keyRDD
+      .partitionBy(
+        new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, 
classTag[Object]))
+      .map(_._2)
+
+    // 4. Sort and Write data
+    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+      DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, 
modelBroadcast,
+        writeStepRowCounter, conf.value.value))
+
+    // Log the number of rows in each step
+    LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+    // Update status
+    updateLoadStatus(model, partialSuccessAccum)
+  }
+
+  /**
+   * provide RDD for sample
+   * CSVRecordReader(univocity parser) will output only one column
+   */
+  private def getSampleRDD(
+      sparkSession: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration,
+      configuration: CarbonDataLoadConfiguration,
+      modelBroadcast: Broadcast[CarbonLoadModel]
+  ): RDD[(Object, Object)] = {
+    // initialize and prepare row counter
+    val configuration = DataLoadProcessBuilder.createConfiguration(model)
+    val header = configuration.getHeader
+    val rangeColumn = model.getRangePartitionColumn
+    val rangeColumnIndex = (0 until header.length).find{
+      index =>
+        header(index).equalsIgnoreCase(rangeColumn.getColName)
+    }.get
+    val rangeField = configuration
+      .getDataFields
+      .find(dataField => 
dataField.getColumn.getColName.equals(rangeColumn.getColName))
+      .get
+
+    // 1. Input
+    val newHadoopConf = new Configuration(hadoopConf)
+    newHadoopConf
+      .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
+    val inputRDD = CsvRDDHelper
+      .csvFileScanRDD(sparkSession, model, newHadoopConf)
+      .mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark
+          .internalInputFunc(rows, index, modelBroadcast, Option.empty, 
Option(rangeField))
+      }
+
+    // 2. Convert
+    val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, 
hadoopConf)
+    val convertRDD = inputRDD
+      .mapPartitionsWithIndex { case (index, rows) =>
+        
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+        DataLoadProcessorStepOnSpark
+          .sampleConvertFunc(rows, rangeField, index, modelBroadcast)
+      }
+      .filter(_ != null)
+
+    convertRDD.map(row => (row.getObject(0), null))
+  }
+
+  /**
+   * calculate the number of partitions.
+   */
+  private def getNumPartitions(
+      configuration: CarbonDataLoadConfiguration,
+      model: CarbonLoadModel,
+      convertRDD: RDD[CarbonRow]
+  ): Int = {
+    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+      
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
+    if (numPartitions <= 0) {
+      if (model.getTotalSize <= 0) {
+        numPartitions = convertRDD.partitions.length
+      } else {
+        // calculate the number of partitions
+        // better to generate a CarbonData file for each partition
+        val totalSize = model.getTotalSize.toDouble
+        val table = model.getCarbonDataLoadSchema.getCarbonTable
+        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
+        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
+        val scaleFactor = if (model.getScaleFactor == 0) {
+          // here it assumes the compression ratio of CarbonData is about 30%,
+          // so it multiply by 3 to get the split size of CSV files.
+          CarbonLoadOptionConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT
+        } else {
+          model.getScaleFactor
+        }
+        // For Range_Column, it will try to generate one big file for each 
partition.
+        // And the size of the big file is about TABLE_BLOCKSIZE of this table.
+        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 
scaleFactor
+        numPartitions = Math.ceil(totalSize / splitSize).toInt
+      }
+    }
+    numPartitions
+  }
+
+  private def indexOfColumn(column: CarbonColumn, fields: Array[DataField]): 
Int = {
+    (0 until fields.length)
+      .find(index => 
fields(index).getColumn.getColName.equals(column.getColName))
+      .get
+  }
+
+  private def createOrderingForColumn(column: CarbonColumn): Ordering[Object] 
= {
+    if (column.isDimension) {
+      val dimension = column.asInstanceOf[CarbonDimension]
+      if (dimension.isGlobalDictionaryEncoding || 
dimension.isDirectDictionaryEncoding) {
+        new PrimtiveOrdering(DataTypes.INT)
+      } else {
+        if (DataTypeUtil.isPrimitiveColumn(column.getDataType)) {
+          new PrimtiveOrdering(column.getDataType)
+        } else {
+          new ByteArrayOrdering()
+        }
+      }
+    } else {
+      new PrimtiveOrdering(column.getDataType)
+    }
+  }
+}
+
+class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
+  val comparator = org.apache.carbondata.core.util.comparator.Comparator
+    .getComparator(dataType)
+
+  override def compare(x: Object, y: Object): Int = {
+    comparator.compare(x, y)
+  }
+}
+
+class ByteArrayOrdering() extends Ordering[Object] {
+  override def compare(x: Object, y: Object): Int = {
+    UnsafeComparer.INSTANCE.compareTo(x.asInstanceOf[Array[Byte]], 
y.asInstanceOf[Array[Byte]])
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 2ca47b3..ff0e1bf 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -17,9 +17,11 @@
 
 package org.apache.carbondata.spark.load
 
+import java.util
+
 import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
+import org.apache.spark.{Accumulator, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -30,13 +32,15 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.processing.loading.{BadRecordsLogger, 
BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, 
TableProcessingOperations}
+import org.apache.carbondata.processing.loading._
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
+import org.apache.carbondata.processing.loading.parser.RowParser
+import 
org.apache.carbondata.processing.loading.parser.impl.{RangeColumnParserImpl, 
RowParserImpl}
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
-import 
org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
+import 
org.apache.carbondata.processing.loading.steps.{DataWriterProcessorStepImpl, 
SortProcessorStepImpl}
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, 
CarbonDataProcessorUtil}
@@ -79,8 +83,6 @@ object DataLoadProcessorStepOnSpark {
     new Iterator[CarbonRow] {
       override def hasNext: Boolean = rows.hasNext
 
-
-
       override def next(): CarbonRow = {
         var row : CarbonRow = null
         if(isRawDataRequired) {
@@ -95,6 +97,44 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
+  def internalInputFunc(
+      rows: Iterator[InternalRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Option[Accumulator[Int]],
+      rangeField: Option[DataField]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val rowParser: RowParser = if (rangeField.isEmpty) {
+      new RowParserImpl(conf.getDataFields, conf)
+    } else {
+      new RangeColumnParserImpl(rangeField.get, conf)
+    }
+    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) 
=>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        var row: CarbonRow = null
+        val rawRow =
+          
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
+        if (isRawDataRequired) {
+          row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+        } else {
+          row = new CarbonRow(rowParser.parseRow(rawRow))
+        }
+        if (rowCounter.isDefined) {
+          rowCounter.get.add(1)
+        }
+        row
+      }
+    }
+  }
+
   def inputAndconvertFunc(
       rows: Iterator[Array[AnyRef]],
       index: Int,
@@ -179,9 +219,33 @@ object DataLoadProcessorStepOnSpark {
       override def hasNext: Boolean = rows.hasNext
 
       override def next(): CarbonRow = {
-        val row = rowConverter.convert(rows.next())
         rowCounter.add(1)
-        row
+        rowConverter.convert(rows.next())
+      }
+    }
+  }
+
+  def sampleConvertFunc(
+      rows: Iterator[CarbonRow],
+      rangeField: DataField,
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel]
+  ): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
+    val rowConverter = new RowConverterImpl(Array(rangeField), conf, 
badRecordLogger)
+    rowConverter.initialize()
+
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) 
=>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        rowConverter.convert(rows.next())
       }
     }
   }
@@ -305,4 +369,107 @@ object DataLoadProcessorStepOnSpark {
           e)
     }
   }
+
+  def sortAndWriteFunc(
+      rows: Iterator[CarbonRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int],
+      conf: Configuration) {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME))
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
+    var model: CarbonLoadModel = null
+    var tableName: String = null
+    var inputProcessor: NewInputProcessorStepImpl = null
+    var rowConverter: RowConverterImpl = null
+    var sortProcessor: SortProcessorStepImpl = null
+    var dataWriter: DataWriterProcessorStepImpl = null
+    try {
+      model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+      val storeLocation = CommonUtil.getTempStoreLocations(index.toString)
+      val conf = DataLoadProcessBuilder.createConfiguration(model, 
storeLocation)
+      tableName = model.getTableName
+
+      rowConverter = new RowConverterImpl(conf.getDataFields, conf, null)
+      rowConverter.initialize()
+      conf.setCardinalityFinder(rowConverter)
+
+      inputProcessor = new NewInputProcessorStepImpl(conf, rows)
+      sortProcessor = new SortProcessorStepImpl(conf, inputProcessor)
+      dataWriter = new DataWriterProcessorStepImpl(conf, sortProcessor)
+      dataWriter.initialize()
+      dataWriter.execute()
+    } catch {
+      case e: CarbonDataWriterException =>
+        LOGGER.error("Failed for table: " + tableName + " in Data Writer 
Step", e)
+        throw new CarbonDataLoadingException("Error while initializing data 
handler : " +
+                                             e.getMessage)
+      case e: Exception =>
+        LOGGER.error("Failed for table: " + tableName + " in Data Writer 
Step", e)
+        throw new CarbonDataLoadingException("There is an unexpected error: " 
+ e.getMessage, e)
+    } finally {
+      if (rowConverter != null) {
+        rowConverter.finish()
+      }
+      // close the dataWriter once the write in done success or fail. if not 
closed then thread to
+      // to prints the rows processed in each step for every 10 seconds will 
never exit.
+      if(null != dataWriter) {
+        dataWriter.close()
+      }
+      // clean up the folders and files created locally for data load operation
+      TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, 
false, false)
+    }
+  }
+}
+
+class NewInputProcessorStepImpl(configuration: CarbonDataLoadConfiguration,
+    rows: Iterator[CarbonRow]) extends 
AbstractDataLoadProcessorStep(configuration, null) {
+  /**
+   * Tranform the data as per the implementation.
+   *
+   * @return Array of Iterator with data. It can be processed parallel if 
implementation class wants
+   * @throws CarbonDataLoadingException
+   */
+  override def execute(): Array[util.Iterator[CarbonRowBatch]] = {
+    val batchSize = CarbonProperties.getInstance.getBatchSize
+    val iteratorArray = new Array[util.Iterator[CarbonRowBatch]](1)
+
+    iteratorArray(0) = new util.Iterator[CarbonRowBatch] {
+
+      val rowBatch = new CarbonRowBatch(batchSize) {
+        var count = 0
+        override def next(): CarbonRow = {
+          count = count + 1
+          rows.next()
+        }
+        override def hasNext: Boolean = rows.hasNext && count < batchSize
+
+        def reset(): Unit = {
+          count = 0
+        }
+      }
+
+      override def next(): CarbonRowBatch = {
+        rowBatch.reset()
+        rowBatch
+      }
+
+      override def hasNext: Boolean = {
+        rows.hasNext
+      }
+    }
+
+    iteratorArray
+  }
+
+  /**
+   * Get the step name for logging purpose.
+   *
+   * @return Step name
+   */
+  override protected def getStepName: String = {
+    "Input Processor for RANGE_SORT"
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
new file mode 100644
index 0000000..07ad011
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+import scala.util.hashing.byteswap32
+
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{CollectionsUtils, Utils}
+
+/**
+ * support data skew scenario
+ * copy from spark: RangePartitioner
+ *
+ * RangePartitioner:
+ * the rangeBounds are the distinct values, each rangeBound has a partition.
+ * so for the data skew scenario, some partitions will include more data.
+ *
+ * DataSkewRangePartitioner:
+ * the rangeBounds are also the distinct values, but it calculates the skew 
weight.
+ * So some rangeBounds maybe have more than one partitions.
+ *
+ * for example, split following CSV file to 5 partitions:
+ * ---------------
+ * col1,col2
+ * 1,
+ * 2,
+ * 3,
+ * 4,
+ * 5,e
+ * 6,f
+ * 7,g
+ * 8,h
+ * 9,i
+ * 10,j
+ * ---------------
+ * RangePartitioner will give the following rangeBounds.
+ * -------------------------------------------------
+ * | range bound| partition range| number of values|
+ * -------------------------------------------------
+ * | null       | <= null        | 4               |
+ * | e          | (null, e]      | 1               |
+ * | f          | (e, f]         | 1               |
+ * | h          | (f, h]         | 2               |
+ * |            | > h            | 2               |
+ * -------------------------------------------------
+ *
+ * DataSkewRangePartitioner will give the following rangeBounds.
+ * --------------------------------------------------------------
+ * | range bound| skew weight| partition range| number of values|
+ * --------------------------------------------------------------
+ * | null       | 2          | <= null        | 2(4/2)          |
+ * | f          |            | (null, f]      | 2               |
+ * | h          |            | (f, h]         | 2               |
+ * |            |            | > h            | 2               |
+ * |            |            | <= null        | 2(4/2)          |
+ * --------------------------------------------------------------
+ * The skew weight of range bound "null" is 2.
+ * So it will start two tasks for range bound "null" to create two partitions.
+ */
+class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
+    partitions: Int,
+    rdd: RDD[_ <: Product2[K, V]])
+  extends Partitioner {
+
+  // We allow partitions = 0, which happens when sorting an empty RDD under 
the default settings.
+  require(partitions >= 0, s"Number of partitions cannot be negative but found 
$partitions.")
+
+  private var ordering = implicitly[Ordering[K]]
+
+  // An array of upper bounds for the first (partitions - 1) partitions
+  // dataSkewCount: how many bounds happened data skew
+  // dataSkewIndex: the index of data skew bounds
+  // dataSkewNum: how many partition of each data skew bound
+  private var (rangeBounds: Array[K], skewCount: Int, skewIndexes: Array[Int],
+  skewWeights: Array[Int]) = {
+    if (partitions <= 1) {
+      (Array.empty[K], 0, Array.empty[Int], Array.empty[Int])
+    } else {
+      // This is the sample size we need to have roughly balanced output 
partitions, capped at 1M.
+      val sampleSize = math.min(20.0 * partitions, 1e6)
+      // Assume the input partitions are roughly balanced and over-sample a 
little bit.
+      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
+      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+      if (numItems == 0L) {
+        (Array.empty[K], 0, Array.empty[Int], Array.empty[Int])
+      } else {
+        // If a partition contains much more than the average number of items, 
we re-sample from it
+        // to ensure that enough items are collected from that partition.
+        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
+        val candidates = ArrayBuffer.empty[(K, Float)]
+        val imbalancedPartitions = mutable.Set.empty[Int]
+        sketched.foreach { case (idx, n, sample) =>
+          if (fraction * n > sampleSizePerPartition) {
+            imbalancedPartitions += idx
+          } else {
+            // The weight is 1 over the sampling probability.
+            val weight = (n.toDouble / sample.length).toFloat
+            for (key <- sample) {
+              candidates += ((key, weight))
+            }
+          }
+        }
+        if (imbalancedPartitions.nonEmpty) {
+          // Re-sample imbalanced partitions with the desired sampling 
probability.
+          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), 
imbalancedPartitions.contains)
+          val seed = byteswap32(-rdd.id - 1)
+          val reSampled = imbalanced.sample(withReplacement = false, fraction, 
seed).collect()
+          val weight = (1.0 / fraction).toFloat
+          candidates ++= reSampled.map(x => (x, weight))
+        }
+        determineBounds(candidates, partitions)
+      }
+    }
+  }
+
+  def determineBounds(
+      candidates: ArrayBuffer[(K, Float)],
+      partitions: Int): (Array[K], Int, Array[Int], Array[Int]) = {
+    val ordered = candidates.sortBy(_._1)
+    val numCandidates = ordered.size
+    val sumWeights = ordered.map(_._2.toDouble).sum
+    val step = sumWeights / partitions
+    var cumWeight = 0.0
+    var target = step
+    val bounds = ArrayBuffer.empty[K]
+    var i = 0
+    var j = 0
+    var previousBound = Option.empty[K]
+    while ((i < numCandidates) && (j < partitions - 1)) {
+      val (key, weight) = ordered(i)
+      cumWeight += weight
+      if (cumWeight >= target) {
+        // Skip duplicate values.
+        if (previousBound.isEmpty || ordering.gteq(key, previousBound.get)) {
+          bounds += key
+          target += step
+          j += 1
+          previousBound = Some(key)
+        }
+      }
+      i += 1
+    }
+
+    if (bounds.size >= 2) {
+      combineDataSkew(bounds)
+    } else {
+      (bounds.toArray, 0, Array.empty[Int], Array.empty[Int])
+    }
+  }
+
+  def combineDataSkew(bounds: ArrayBuffer[K]): (Array[K], Int, Array[Int], 
Array[Int]) = {
+    val finalBounds = ArrayBuffer.empty[K]
+    var preBound = bounds(0)
+    finalBounds += preBound
+    val dataSkewIndexTmp = ArrayBuffer.empty[Int]
+    val dataSkewNumTmp = ArrayBuffer.empty[Int]
+    var dataSkewCountTmp = 1
+    (1 until bounds.size).foreach { index =>
+      val bound = bounds(index)
+      if (ordering.equiv(bound, preBound)) {
+        if (dataSkewCountTmp == 1) {
+          dataSkewIndexTmp += (finalBounds.size - 1)
+        }
+        dataSkewCountTmp += 1
+      } else {
+        finalBounds += bound
+        preBound = bound
+        if (dataSkewCountTmp > 1) {
+          dataSkewNumTmp += dataSkewCountTmp
+          dataSkewCountTmp = 1
+        }
+      }
+    }
+    if (dataSkewIndexTmp.size > 0) {
+      (finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, 
dataSkewNumTmp.toArray)
+    } else {
+      (finalBounds.toArray, 0, Array.empty[Int], Array.empty[Int])
+    }
+  }
+
+  private val skewPartitions: Int = if (skewCount == 0) {
+    0
+  } else {
+    skewWeights.map(_ - 1).sum
+  }
+
+  def numPartitions: Int = rangeBounds.length + 1 + skewPartitions
+
+  private var binarySearch: ((Array[K], K) => Int) = 
CollectionsUtils.makeBinarySearch[K]
+  private var skewIndexesLen = 0
+  private var dsps: Array[DataSkewPartitioner] = null
+
+  def initialize(): Unit = {
+    if (skewCount > 0) {
+      skewIndexesLen = skewIndexes.length
+      dsps = new Array[DataSkewPartitioner](skewIndexesLen)
+      var previousPart = rangeBounds.length
+      for (i <- 0 until skewIndexesLen) {
+        dsps(i) = new DataSkewPartitioner(skewIndexes(i), previousPart, 
skewWeights(i))
+        previousPart = previousPart + skewWeights(i) - 1
+      }
+    }
+  }
+
+  private var needInit = true
+
+  def getPartition(key: Any): Int = {
+    if (needInit) {
+      needInit = false
+      initialize()
+    }
+    val k = key.asInstanceOf[K]
+    var partition = 0
+    if (rangeBounds.length <= 128) {
+      // If we have less than 128 partitions naive search
+      while (partition < rangeBounds.length && ordering.gt(k, 
rangeBounds(partition))) {
+        partition += 1
+      }
+    } else {
+      // Determine which binary search method to use only once.
+      partition = binarySearch(rangeBounds, k)
+      // binarySearch either returns the match location or -[insertion point]-1
+      if (partition < 0) {
+        partition = -partition - 1
+      }
+      if (partition > rangeBounds.length) {
+        partition = rangeBounds.length
+      }
+    }
+    if (skewCount == 0) {
+      partition
+    } else {
+      getDataSkewPartition(partition)
+    }
+  }
+
+  def getDataSkewPartition(partition: Int): Int = {
+    var index = -1
+    if (partition <= skewIndexes(skewIndexesLen - 1) && partition >= 
skewIndexes(0)) {
+      for (i <- 0 until skewIndexesLen) {
+        if (skewIndexes(i) == partition) {
+          index = i
+        }
+      }
+    }
+    if (index == -1) {
+      partition
+    } else {
+      nextPartition(index)
+    }
+  }
+
+  def nextPartition(index: Int): Int = {
+    dsps(index).nextPartition()
+  }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case r: DataSkewRangePartitioner[_, _] =>
+        r.rangeBounds.sameElements(rangeBounds)
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int = {
+    val prime = 31
+    var result = 1
+    var i = 0
+    while (i < rangeBounds.length) {
+      result = prime * result + rangeBounds(i).hashCode
+      i += 1
+    }
+    result = prime * result
+    result
+  }
+
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream): Unit = {
+    Utils.tryOrIOException {
+      val sfactory = SparkEnv.get.serializer
+      sfactory match {
+        case js: JavaSerializer => out.defaultWriteObject()
+        case _ =>
+          out.writeInt(skewCount)
+          if (skewCount > 0) {
+            out.writeObject(skewIndexes)
+            out.writeObject(skewWeights)
+          }
+          out.writeObject(ordering)
+          out.writeObject(binarySearch)
+
+          val ser = sfactory.newInstance()
+          Utils.serializeViaNestedStream(out, ser) { stream =>
+            stream.writeObject(scala.reflect.classTag[Array[K]])
+            stream.writeObject(rangeBounds)
+          }
+      }
+    }
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {
+    Utils.tryOrIOException {
+      needInit = true
+      val sfactory = SparkEnv.get.serializer
+      sfactory match {
+        case js: JavaSerializer => in.defaultReadObject()
+        case _ =>
+          skewCount = in.readInt()
+          if (skewCount > 0) {
+            skewIndexes = in.readObject().asInstanceOf[Array[Int]]
+            skewWeights = in.readObject().asInstanceOf[Array[Int]]
+          }
+          ordering = in.readObject().asInstanceOf[Ordering[K]]
+          binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]
+
+          val ser = sfactory.newInstance()
+          Utils.deserializeViaNestedStream(in, ser) { ds =>
+            implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
+            rangeBounds = ds.readObject[Array[K]]()
+          }
+      }
+    }
+  }
+}
+
+class DataSkewPartitioner(originPart: Int, previousPart: Int, skewWeight: Int) 
{
+  var index = 0
+
+  def nextPartition(): Int = {
+    if (index == 0) {
+      index = index + 1
+      originPart
+    } else {
+      val newPartition = previousPart + index
+      index = index + 1
+      if (index == skewWeight) {
+        index = 0
+      }
+      newPartition
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9c7d28b..4b14879 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1103,7 +1103,9 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       "TIMESTAMPFORMAT",
       "SKIP_EMPTY_LINE",
       "SORT_COLUMN_BOUNDS",
-      "LOAD_MIN_SIZE_INMB"
+      "LOAD_MIN_SIZE_INMB",
+      "RANGE_COLUMN",
+      "SCALE_FACTOR"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 92d1791..02acde6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -351,6 +351,12 @@ object CarbonDataRDDFactory {
         } else {
           status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) 
!= null) {
             loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, 
hadoopConf)
+          } else if (dataFrame.isEmpty && isSortTable &&
+                     carbonLoadModel.getRangePartitionColumn != null &&
+                     (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) 
||
+                      
sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
+            DataLoadProcessBuilderOnSpark
+              .loadDataUsingRangeSort(sqlContext.sparkSession, 
carbonLoadModel, hadoopConf)
           } else if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
             
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
               dataFrame, carbonLoadModel, hadoopConf)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index f01aea8..d333490 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -30,6 +30,7 @@ import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 import org.apache.commons.io.input.BOMInputStream;
 import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -74,6 +75,10 @@ public class CSVInputFormat extends 
FileInputFormat<NullWritable, StringArrayWri
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
   public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
   public static final String NUMBER_OF_COLUMNS = 
"carbon.csvinputformat.number.of.columns";
+  /**
+   * support only one column index
+   */
+  public static final String SELECT_COLUMN_INDEX = 
"carbon.csvinputformat.select.column.index";
   public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000;
   public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000;
 
@@ -215,6 +220,11 @@ public class CSVInputFormat extends 
FileInputFormat<NullWritable, StringArrayWri
     // setting the content length to to limit the length of displayed contents 
being parsed/written
     // in the exception message when an error occurs.
     
parserSettings.setErrorContentLength(CarbonCommonConstants.CARBON_ERROR_CONTENT_LENGTH);
+
+    String selectColumnIndex = job.get(SELECT_COLUMN_INDEX, null);
+    if (!StringUtils.isBlank(selectColumnIndex)) {
+      parserSettings.selectIndexes(Integer.parseInt(selectColumnIndex));
+    }
     return parserSettings;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 71d61db..ccf6eb2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
@@ -233,6 +234,22 @@ public class CarbonLoadModel implements Serializable {
    */
   private String columnCompressor;
 
+  /**
+   * the total size of loading data
+   */
+  private long totalSize;
+
+  /**
+   * range partition data by this column
+   */
+  private CarbonColumn rangePartitionColumn;
+
+  /**
+   * control the file size of input data for each range partition
+   * input size = max(bockletSize, (blockSize - blocketSize)) * scaleFactor
+   */
+  private int scaleFactor;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -936,4 +953,28 @@ public class CarbonLoadModel implements Serializable {
   public void setColumnCompressor(String columnCompressor) {
     this.columnCompressor = columnCompressor;
   }
+
+  public CarbonColumn getRangePartitionColumn() {
+    return rangePartitionColumn;
+  }
+
+  public void setRangePartitionColumn(CarbonColumn rangePartitionColumn) {
+    this.rangePartitionColumn = rangePartitionColumn;
+  }
+
+  public long getTotalSize() {
+    return totalSize;
+  }
+
+  public void setTotalSize(long totalSize) {
+    this.totalSize = totalSize;
+  }
+
+  public void setScaleFactor(int scaleFactor) {
+    this.scaleFactor = scaleFactor;
+  }
+
+  public int getScaleFactor() {
+    return scaleFactor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 51a7b3a..739776c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -300,6 +300,35 @@ public class CarbonLoadModelBuilder {
     validateAndSetLoadMinSize(carbonLoadModel);
 
     validateAndSetColumnCompressor(carbonLoadModel);
+
+    validateRangeColumn(optionsFinal, carbonLoadModel);
+  }
+
+  private void validateRangeColumn(Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel) throws InvalidLoadOptionException {
+    String rangeColumn = optionsFinal.get("range_column");
+    if (rangeColumn != null) {
+      carbonLoadModel
+          .setRangePartitionColumn(table.getColumnByName(table.getTableName(), 
rangeColumn));
+      if (carbonLoadModel.getRangePartitionColumn() == null) {
+        throw new InvalidLoadOptionException("Invalid range_column option");
+      }
+    }
+
+    String scaleFactor = optionsFinal.get("scale_factor");
+    if (scaleFactor != null) {
+      try {
+        int scale = Integer.parseInt(scaleFactor);
+        if (scale < 1 || scale > 300) {
+          throw new InvalidLoadOptionException(
+              "Invalid scale_factor option, the range of scale_factor should 
be [1, 300]");
+        }
+        carbonLoadModel.setScaleFactor(scale);
+      } catch (NumberFormatException ex) {
+        throw new InvalidLoadOptionException(
+            "Invalid scale_factor option, scale_factor should be a integer");
+      }
+    }
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 78049a4..419824b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -193,6 +193,9 @@ public class LoadOption {
     optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
         Maps.getOrDefault(options, 
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
             CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT));
+
+    optionsFinal.put("range_column", Maps.getOrDefault(options, 
"range_column", null));
+    optionsFinal.put("scale_factor", Maps.getOrDefault(options, 
"scale_factor", null));
     return optionsFinal;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/45951c76/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java
new file mode 100644
index 0000000..ab91ca6
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import java.util.ArrayList;
+
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.loading.parser.GenericParser;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+
+public class RangeColumnParserImpl implements RowParser {
+
+  private GenericParser genericParser;
+
+  public RangeColumnParserImpl(DataField rangeField, 
CarbonDataLoadConfiguration configuration) {
+    String[] complexDelimiters =
+        (String[]) 
configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+    ArrayList<String> complexDelimiterList = new 
ArrayList<>(complexDelimiters.length);
+    for (int index = 0; index < complexDelimiters.length; index++) {
+      complexDelimiterList.add(complexDelimiters[index]);
+    }
+    String nullFormat =
+        
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    genericParser =
+        CarbonParserFactory.createParser(rangeField.getColumn(), 
complexDelimiterList, nullFormat);
+  }
+
+  @Override
+  public Object[] parseRow(Object[] row) {
+    if (row == null || row.length < 1) {
+      return new String[1];
+    }
+    Object[] result = new Object[1];
+    result[0] = genericParser.parse(row[0]);
+    return result;
+  }
+
+}

Reply via email to