[GitHub] carbondata pull request #1559: [CARBONDATA-1805][Dictionary] Optimize prunin...

2017-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/1559


---


[GitHub] carbondata pull request #1559: [CARBONDATA-1805][Dictionary] Optimize prunin...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1559#discussion_r156873416
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 ---
@@ -348,36 +347,53 @@ object GlobalDictionaryUtil {
   }
 
   /**
-   * load CSV files to DataFrame by using datasource 
"com.databricks.spark.csv"
+   * load and prune dictionary Rdd from csv file or input dataframe
*
-   * @param sqlContext  SQLContext
-   * @param carbonLoadModel carbon data load model
+   * @param sqlContext sqlContext
+   * @param carbonLoadModel carbonLoadModel
+   * @param inputDF input dataframe
+   * @param requiredCols names of dictionary column
+   * @param hadoopConf hadoop configuration
+   * @return rdd that contains only dictionary columns
*/
-  def loadDataFrame(sqlContext: SQLContext,
-  carbonLoadModel: CarbonLoadModel,
-  hadoopConf: Configuration): DataFrame = {
-CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
-hadoopConf.set(FileInputFormat.INPUT_DIR, 
carbonLoadModel.getFactFilePath)
-val columnNames = carbonLoadModel.getCsvHeaderColumns
-val schema = StructType(columnNames.map[StructField, 
Array[StructField]] { column =>
-  StructField(column, StringType)
-})
-val values = new Array[String](columnNames.length)
-val row = new StringArrayRow(values)
-val jobConf = new JobConf(hadoopConf)
-SparkHadoopUtil.get.addCredentials(jobConf)
-TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
-  Array[Path](new Path(carbonLoadModel.getFactFilePath)),
-  jobConf)
-val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
-  sqlContext.sparkContext,
-  classOf[CSVInputFormat],
-  classOf[NullWritable],
-  classOf[StringArrayWritable],
-  jobConf).setName("global dictionary").map[Row] { currentRow =>
-  row.setValues(currentRow._2.get())
+  private def loadInputDataAsDictRdd(sqlContext: SQLContext, 
carbonLoadModel: CarbonLoadModel,
+  inputDF: Option[DataFrame], requiredCols: Array[String],
+  hadoopConf: Configuration): RDD[Row] = {
+if (inputDF.isDefined) {
+  inputDF.get.select(requiredCols.head, requiredCols.tail : _*).rdd
+} else {
+  CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+  hadoopConf.set(FileInputFormat.INPUT_DIR, 
carbonLoadModel.getFactFilePath)
+  val headerCols = 
carbonLoadModel.getCsvHeaderColumns.map(_.toLowerCase)
+  val header2Idx = headerCols.zipWithIndex.toMap
+  // index of dictionary columns in header
+  val dictColIdx = requiredCols.map(c => header2Idx(c.toLowerCase))
+
+  val jobConf = new JobConf(hadoopConf)
+  SparkHadoopUtil.get.addCredentials(jobConf)
+  TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+jobConf)
+  val dictRdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+sqlContext.sparkContext,
+classOf[CSVInputFormat],
+classOf[NullWritable],
+classOf[StringArrayWritable],
+jobConf).setName("global dictionary").map[Row] { currentRow =>
--- End diff --

move setName and map to separate line


---


[GitHub] carbondata pull request #1559: [CARBONDATA-1805][Dictionary] Optimize prunin...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1559#discussion_r156871399
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 ---
@@ -348,36 +347,53 @@ object GlobalDictionaryUtil {
   }
 
   /**
-   * load CSV files to DataFrame by using datasource 
"com.databricks.spark.csv"
+   * load and prune dictionary Rdd from csv file or input dataframe
*
-   * @param sqlContext  SQLContext
-   * @param carbonLoadModel carbon data load model
+   * @param sqlContext sqlContext
+   * @param carbonLoadModel carbonLoadModel
+   * @param inputDF input dataframe
+   * @param requiredCols names of dictionary column
+   * @param hadoopConf hadoop configuration
+   * @return rdd that contains only dictionary columns
*/
-  def loadDataFrame(sqlContext: SQLContext,
-  carbonLoadModel: CarbonLoadModel,
-  hadoopConf: Configuration): DataFrame = {
-CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
-hadoopConf.set(FileInputFormat.INPUT_DIR, 
carbonLoadModel.getFactFilePath)
-val columnNames = carbonLoadModel.getCsvHeaderColumns
-val schema = StructType(columnNames.map[StructField, 
Array[StructField]] { column =>
-  StructField(column, StringType)
-})
-val values = new Array[String](columnNames.length)
-val row = new StringArrayRow(values)
-val jobConf = new JobConf(hadoopConf)
-SparkHadoopUtil.get.addCredentials(jobConf)
-TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
-  Array[Path](new Path(carbonLoadModel.getFactFilePath)),
-  jobConf)
-val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
-  sqlContext.sparkContext,
-  classOf[CSVInputFormat],
-  classOf[NullWritable],
-  classOf[StringArrayWritable],
-  jobConf).setName("global dictionary").map[Row] { currentRow =>
-  row.setValues(currentRow._2.get())
+  private def loadInputDataAsDictRdd(sqlContext: SQLContext, 
carbonLoadModel: CarbonLoadModel,
--- End diff --

please move parameter to separate line, one parameter one line


---


[GitHub] carbondata pull request #1559: [CARBONDATA-1805][Dictionary] Optimize prunin...

2017-11-23 Thread xuchuanyin
GitHub user xuchuanyin opened a pull request:

https://github.com/apache/carbondata/pull/1559

[CARBONDATA-1805][Dictionary] Optimize pruning for dictionary loading

Be sure to do all of the following checklist to help us incorporate 
your contribution quickly and easily:

 - [X] Any interfaces changed?
  `NO`
 - [X] Any backward compatibility impacted?
  `NO`
 - [X] Document update required?
  `NO`
 - [X] Testing done
Please provide details on 
- Whether new unit test cases have been added or why no new tests 
are required?
`NO TESTS ADDED, PERFORMANCE ENHANCEMENT DIDN'T AFFECT THE 
FUNCTIONALITY`
- How it is tested? Please attach test report.
`TESTED IN CLUSTER WITH REAL DATA`
- Is it a performance related change? Please attach the performance 
test report.
`PERFORMANCE ENHANCED, DICTIONARY TIME REDUCED FROM 2.9MIN TO 29SEC`
- Any additional information to help reviewers in testing this 
change.
`NO`
 - [X] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA. 
`NOT RELATED`

COPY FROM JIRA
===

# SCENARIO

Recently I have tried dictionary feature in Carbondata and found its 
dictionary generating phase in data loading is quite slow. My scenario is as 
below:

+ Input Data: 35.8GB CSV file with 199 columns and 126 Million lines

+ Dictionary columns: 3 columns each containing 19213,4,9 distinct values

The whole data loading consumes about 2.9min for dictionary generating and 
4.6min for fact data loading -- about 39% of the time are spent on dictionary.

Having observed the nmon result, Ifound the CPU usage were quite high 
during the dictionary generating phase and the Disk, Network were quite normal.

# ANALYZE

After I went through the dictionary generating related code, I found 
Carbondata aleady prune non-dictionary columns before generating dictionary. 
But the problem is that `the pruning comes after data file reading`, this will 
cause some overhead, we can optimize it by `prune while reading data file`.

# RESOLVE

Refactor the `loadDataFrame` method in `GlobalDictionaryUtil`, only pruning 
the non-dictionary columns while reading the data file.

After implementing the above optimization, the dictionary generating costs 
only `29s` -- **`about 6 times better than before`**(2.9min), and the fact data 
loading costs the same as before(4.6min), about 10% of the time are spent on 
dictionary.

# NOTE

+ Currently only `load data file` will benefit from this optimization, 
while `load data frame` will not.

+ Before implementing this solution, I tried another solution -- cache 
dataframe of the data file, the performance was even worse -- the dictionary 
generating time was 5.6min.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuchuanyin/carbondata opt_dict_load

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/1559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1559


commit e8e49ed54085700eadde81842af0b0daecaed12a
Author: xuchuanyin 
Date:   2017-11-24T03:27:02Z

optimize pruning for dictionary loading




---