Github user qiuchenjian commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r238292207 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + // input data from files + val columnCount = model.getCsvHeaderColumns.length + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) + .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) + } + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + // 1. Input + val inputRDD = originRDD + .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + // 2. Convert + val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum, + convertStepRowCounter) + }.filter(_ != null) + // 3. Range partition + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + // here it assumes the compression ratio of CarbonData is about 33%, + // so it multiply by 3 to get the split size of CSV files. + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3 + numPartitions = Math.ceil(totalSize / splitSize).toInt --- End diff -- If insert using dataframe, I think totalSize will be 0
---