Repository: carbondata Updated Branches: refs/heads/master 877eabdd6 -> 3647aee3c
[CARBONDATA-2274] fix for Partition table having more than 4 column giving zero record Converting of Array[String,String] to Map[String, String] was giving wrong order of partition column. And we were using that sequence to create path. So used LinkedHashMap to avoid reordering. This closes #2096 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3647aee3 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3647aee3 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3647aee3 Branch: refs/heads/master Commit: 3647aee3cc24a623d982d4d7c6c1de0b06f58370 Parents: 877eabd Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Fri Mar 23 20:19:43 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Mar 29 07:25:42 2018 +0530 ---------------------------------------------------------------------- .../StandardPartitionTableLoadingTestCase.scala | 21 ++++++++++++++++++++ .../carbondata/spark/util/CarbonScalaUtil.scala | 18 ++++++++++------- .../datasources/SparkCarbonTableFormat.scala | 18 +++++++++++------ 3 files changed, 44 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index baf1627..8342c69 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -135,6 +135,26 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) } + test("data loading for partition table for five partition column") { + sql( + """ + | CREATE TABLE partitionfive (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int) + | PARTITIONED BY (utilization int,salary int,workgroupcategory int, empname String, + | designation String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionfive OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + validateDataFiles("default_partitionfive", "0", 10) + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive where empno>15 order by empno "), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno>15 order by empno")) + } test("multiple data loading for partition table for three partition column") { sql( @@ -519,6 +539,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("drop table if exists partitionone") sql("drop table if exists partitiontwo") sql("drop table if exists partitionthree") + sql("drop table if exists partitionfive") sql("drop table if exists partitionmultiplethree") sql("drop table if exists insertpartitionthree") sql("drop table if exists staticpartitionone") http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 394ba5f..37cdc41 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -23,6 +23,8 @@ import java.nio.charset.Charset import java.text.SimpleDateFormat import java.util.Date +import scala.collection.mutable + import com.univocity.parsers.common.TextParsingException import org.apache.spark.SparkException import org.apache.spark.sql._ @@ -294,13 +296,12 @@ object CarbonScalaUtil { * Update partition values as per the right date and time format * @return updated partition spec */ - def updatePartitions( - partitionSpec: Map[String, String], - table: CarbonTable): Map[String, String] = { + def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String], + table: CarbonTable): mutable.LinkedHashMap[String, String] = { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) - partitionSpec.map{ case (col, pvalue) => + partitionSpec.map { case (col, pvalue) => // replace special string with empty value. val value = if (pvalue == null) { hivedefaultpartition @@ -340,11 +341,14 @@ object CarbonScalaUtil { def updatePartitions( parts: Seq[CatalogTablePartition], table: CarbonTable): Seq[CatalogTablePartition] = { - parts.map{ f => + parts.map { f => + val specLinkedMap: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap + .empty[String, String] + f.spec.foreach(fSpec => specLinkedMap.put(fSpec._1, fSpec._2)) val changedSpec = updatePartitions( - f.spec, - table) + specLinkedMap, + table).toMap f.copy(spec = changedSpec) }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 54f861c..9110482 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -276,10 +276,14 @@ private class CarbonOutputWriter(path: String, } } var (updatedPartitions, partitionData) = if (partitions.nonEmpty) { + val linkedMap = mutable.LinkedHashMap[String, String]() val updatedPartitions = partitions.map(splitPartition) - (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) + updatedPartitions.foreach { + case (k, v) => linkedMap.put(k, v) + } + (linkedMap, updatePartitions(updatedPartitions.map(_._2))) } else { - (Map.empty[String, String].toArray, Array.empty) + (mutable.LinkedHashMap.empty[String, String], Array.empty) } private def splitPartition(p: String) = { @@ -305,8 +309,10 @@ private class CarbonOutputWriter(path: String, val index = currPartitions.indexOf(writeSpec) if (index > -1) { val spec = currPartitions.get(index) - updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray - partitionData = updatePartitions(updatedPartitions.map(_._2)) + spec.getPartitions.asScala.map(splitPartition).foreach { + case (k, v) => updatedPartitions.put(k, v) + } + partitionData = updatePartitions(updatedPartitions.map(_._2).toSeq) } } updatedPath @@ -393,7 +399,7 @@ private class CarbonOutputWriter(path: String, val formattedPartitions = // All dynamic partitions need to be converted to proper format CarbonScalaUtil.updatePartitions( - updatedPartitions.toMap, + updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]], model.getCarbonDataLoadSchema.getCarbonTable) formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) SegmentFileStore.writeSegmentFile( @@ -411,7 +417,7 @@ private class CarbonOutputWriter(path: String, val formattedPartitions = // All dynamic partitions need to be converted to proper format CarbonScalaUtil.updatePartitions( - updatedPartitions.toMap, + updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]], model.getCarbonDataLoadSchema.getCarbonTable) val partitionstr = formattedPartitions.map{p => ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)