http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala index 2b0dd09..f238d2b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala @@ -17,13 +17,14 @@ package org.apache.carbondata.spark.testsuite.standardpartition import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -49,24 +50,19 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA } - def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int, partitionMapFiles: Int): Unit = { + def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".carbondata") - } - }) - assert(dataFiles.length == partitions) - val partitionFile = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".partitionmap") - } - }) - assert(partitionFile.length == partitionMapFiles) + val partitions = CarbonFilters + .getPartitions(Seq.empty, + sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) + assert(partitions.get.length == partition) + val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath) + val segLoad = details.find(_.getLoadName.equals(segmentId)).get + val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile) + assert(seg.getIndexFiles.size == indexes) } test("clean up partition table for int partition column") { @@ -89,11 +85,10 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA sql(s"""select count (*) from originTable where empno=11""")) sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""") - validateDataFiles("default_partitionone", "0", 10, 2) + validateDataFiles("default_partitionone", "0", 9, 9) sql(s"CLEAN FILES FOR TABLE partitionone").show() - + validateDataFiles("default_partitionone", "0", 9, 9) checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11") - validateDataFiles("default_partitionone", "0", 9, 1) checkAnswer( sql(s"""select count (*) from partitionone where empno=11"""), Seq(Row(0))) @@ -113,11 +108,11 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""") - validateDataFiles("default_partitionmany", "0", 10, 2) - validateDataFiles("default_partitionmany", "1", 10, 2) + validateDataFiles("default_partitionmany", "0", 8, 8) + validateDataFiles("default_partitionmany", "1", 8, 8) sql(s"CLEAN FILES FOR TABLE partitionmany").show() - validateDataFiles("default_partitionmany", "0", 8, 1) - validateDataFiles("default_partitionmany", "1", 8, 1) + validateDataFiles("default_partitionmany", "0", 8, 8) + validateDataFiles("default_partitionmany", "1", 8, 8) checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479") checkAnswer( sql(s"""select count (*) from partitionmany where deptname='Learning'"""), @@ -142,7 +137,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""") sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""") assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0) - validateDataFiles("default_partitionall", "0", 10, 6) + validateDataFiles("default_partitionall", "0", 0, 0) sql(s"CLEAN FILES FOR TABLE partitionall").show() validateDataFiles("default_partitionall", "0", 0, 0) checkAnswer( @@ -150,6 +145,30 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA Seq(Row(0))) } + test("clean up after deleting segments on table") { + sql( + """ + | CREATE TABLE partitionalldeleteseg (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + assert(sql(s"show segments for table partitionalldeleteseg").count == 4) + checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"), Seq(Row(40))) + sql(s"delete from table partitionalldeleteseg where segment.id in (1)").show() + checkExistence(sql(s"show segments for table partitionalldeleteseg"), true, "Marked for Delete") + checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"), Seq(Row(30))) + sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg").show() + assert(sql(s"show segments for table partitionalldeleteseg").count == 3) + } + + override def afterAll = { dropTable } @@ -162,6 +181,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA sql("drop table if exists partitionmany") sql("drop table if exists partitionshow") sql("drop table if exists staticpartition") + sql("drop table if exists partitionalldeleteseg") } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala index 22ebd80..33e761f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala @@ -20,11 +20,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndAfterAll { @@ -49,23 +45,6 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA } - def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = { - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return CarbonTablePath.isCarbonDataFile(file.getName) || - CarbonTablePath.isCarbonIndexFile(file.getName) - } - }) - assert(dataFiles.length > 1) - val pstore = new PartitionMapFileStore() - pstore.readAllPartitionsOfSegment(segmentDir) - println(pstore.getPartitionMap) - } test("data compaction for partition table for one partition column") { sql( @@ -83,9 +62,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql("ALTER TABLE partitionone COMPACT 'MINOR'").collect() - - validateDataFiles("default_partitionone", "0.1", 1) - + checkExistence(sql("show segments for table partitionone"), true, "0.1") checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone where empno=11 order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno=11 order by empno")) @@ -107,9 +84,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect() - - validateDataFiles("default_partitionthree", "0.1", 1) - + checkExistence(sql("show segments for table partitionthree"), true, "0.1") checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno")) } @@ -129,6 +104,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql("ALTER TABLE partitionmajor COMPACT 'MINOR'").collect() + checkExistence(sql("show segments for table partitionmajor"), true, "0.1") sql(s"""ALTER TABLE partitionmajor DROP PARTITION(workgroupcategory='1')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") @@ -136,7 +112,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") val rows = sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno").collect() sql("ALTER TABLE partitionmajor COMPACT 'MAJOR'").collect() - validateDataFiles("default_partitionmajor", "0.2", 1) + checkExistence(sql("show segments for table partitionmajor"), true, "0.2") checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"), rows) } @@ -158,9 +134,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA val p1 = sql(s"""select count(*) from staticpartition where deptname='software'""").collect() val p2 = sql(s"""select count(*) from staticpartition where deptname='finance'""").collect() sql("ALTER TABLE staticpartition COMPACT 'MINOR'").collect() - - validateDataFiles("default_staticpartition", "0.1", 1) - + checkExistence(sql("show segments for table staticpartition"), true, "0.1") checkAnswer(sql(s"""select count(*) from staticpartition where deptname='software'"""), p1) checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""), p2) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala index aac823a..6e36623 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -204,6 +204,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl sql(s"""select count (*) from partitionone1 where empno=11"""), sql(s"""select count (*) from originTable where empno=11""")) sql(s"""ALTER TABLE partitionone1 DROP PARTITION(empno='11')""") + sql(s"CLEAN FILES FOR TABLE partitionone1").show() assert(Files.notExists(Paths.get(TestQueryExecutor.warehouse + "/partitionone1/" + "empno=11"), LinkOption.NOFOLLOW_LINKS)) sql("drop table if exists partitionone1") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 10da906..c8f7be3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -16,20 +16,24 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition +import scala.collection.JavaConverters._ import java.io.{File, FileWriter, IOException} import java.util import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.BatchedDataSourceScanExec +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.spark.rdd.CarbonScanRDD @@ -66,18 +70,15 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") } - def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = { + def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".partitionmap") - } - }) - assert(dataFiles.length == partitions) + val partitions = CarbonFilters + .getPartitions(Seq.empty, + sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) + assert(partitions.get.length == partition) } test("data loading for partition table for one partition column") { @@ -92,7 +93,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_partitionone", "0", 1) + validateDataFiles("default_partitionone", "0", 10) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -111,7 +112,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_partitiontwo", "0", 1) + validateDataFiles("default_partitiontwo", "0", 10) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -130,7 +131,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_partitionthree", "0", 1) + validateDataFiles("default_partitionthree", "0", 10) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -146,14 +147,14 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte | utilization int,salary int) | PARTITIONED BY (workgroupcategory int, empname String, designation String) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname') + | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname') """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_partitionmultiplethree", "1", 1) - validateDataFiles("default_partitionmultiplethree", "2", 1) + validateDataFiles("default_partitionmultiplethree", "1", 10) + validateDataFiles("default_partitionmultiplethree", "2", 10) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmultiplethree order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno")) } @@ -172,7 +173,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""") sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""") - validateDataFiles("default_insertpartitionthree", "0", 1) + validateDataFiles("default_insertpartitionthree", "0", 10) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from insertpartitionthree order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno")) @@ -205,7 +206,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""") - validateDataFiles("default_singlepasspartitionone", "0", 1) + validateDataFiles("default_singlepasspartitionone", "0", 8) } test("data loading for partition table for one static partition column with load syntax") { @@ -289,7 +290,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte | utilization int,salary int) | PARTITIONED BY (workgroupcategory int, empname String, designation String) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname') + | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname') """.stripMargin) val tasks = new util.ArrayList[Callable[String]]() @@ -334,13 +335,12 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree") val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0") - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val files = carbonFile.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName) - }) - assert(files.length == 10) + carbonTable.getTablePath) + val details = SegmentStatusManager.readTableStatusFile(tablePath.getTableStatusFilePath) + val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile) + store.readIndexFiles() + store.getIndexFiles + assert(store.getIndexFiles.size() == 10) } test("load static partition table for one static partition column with load syntax issue") { @@ -433,10 +433,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles") FileUtils.deleteDirectory(folder) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles") - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getSegmentDir("0", "0") - assert(new File(segmentDir).listFiles().length < 50) + val specs = CarbonFilters.getPartitions(Seq.empty, sqlContext.sparkSession, TableIdentifier("smallpartitionfiles")) + specs.get.foreach{s => + assert(new File(s.getLocation.toString).listFiles().length < 10) + } } test("verify partition read with small files") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala index 841185b..c24a277 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -61,7 +61,6 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as Date)""") -// sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"), sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) } @@ -119,6 +118,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf } test("dynamic and static partition table with overwrite ") { + sql("drop table if exists insertstaticpartitiondynamic") sql( """ | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary int) @@ -137,6 +137,80 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf } + test("dynamic and static partition table with many partition cols overwrite ") { + sql("drop table if exists insertstaticpartitiondynamic") + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int) + | PARTITIONED BY (empno int, empname String, doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect() + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname='ravi', doj) select designation, salary, doj from insertstaticpartitiondynamic""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1 and empname='ravi'"), rows) + } + + test("dynamic and static partition table with many partition cols overwrite with diffrent order") { + sql("drop table if exists insertstaticpartitiondynamic") + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int) + | PARTITIONED BY (empno int, empname String, doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect() + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname, doj) select designation, salary,empname, doj from insertstaticpartitiondynamic""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"), rows) + } + + test("dynamic and static partition table with many partition cols load overwrite ") { + sql("drop table if exists insertstaticpartitiondynamic") + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int) + | PARTITIONED BY (empno1 int, empname1 String, doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1='ravi', doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1 and empname1='ravi'"), Seq(Row(10))) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1='ravi', doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1 and empname1='ravi'"), Seq(Row(10))) + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic"), Seq(Row(10))) + + intercept[Exception] { + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + } + } + + test("dynamic and static partition table with many partition cols load differnt combinations ") { + sql("drop table if exists insertstaticpartitiondynamic") + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int) + | PARTITIONED BY (empno1 int, empname String, doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1"), Seq(Row(10))) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1"), Seq(Row(10))) + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic"), Seq(Row(10))) + + intercept[Exception] { + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + } + } + test("overwriting all partition on table and do compaction") { sql( """ http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index 95345de..918bbff 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.rdd.CarbonScanRDD @@ -260,16 +261,81 @@ test("Creation of partition table should fail if the colname in table schema and } } - test("add partition based on location on partition table should fail"){ + + test("add partition based on location on partition table"){ sql("drop table if exists partitionTable") sql( """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata' """.stripMargin) sql("insert into partitionTable select 1,'huawei','abc'") + val location = metastoredb +"/" +"def" checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"))) - intercept[Exception]{ - sql("alter table partitionTable add partition (email='def') location 'abc/part1'") - } + sql(s"""alter table partitionTable add partition (email='def') location '$location'""") + sql("insert into partitionTable select 1,'huawei','def'") + checkAnswer(sql("select email from partitionTable"), Seq(Row("def"), Row("abc"))) + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location)) + } + + test("add partition with static column partition with load command") { + sql( + """ + | CREATE TABLE staticpartitionlocload (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val location = metastoredb +"/" +"ravi" + sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload") + verifyPartitionInfo(frame, Seq("empname=ravi")) + assert(frame.count() == 10) + val file = FileFactory.getCarbonFile(location) + assert(file.exists()) + FileFactory.deleteAllCarbonFilesOfDir(file) + } + + test("add external partition with static column partition with load command") { + + sql( + """ + | CREATE TABLE staticpartitionlocloadother (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val location = metastoredb +"/" +"ravi" + sql(s"""alter table staticpartitionlocloadother add partition (empname='ravi') location '$location'""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql( + """ + | CREATE TABLE staticpartitionextlocload (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""") + val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload") + verifyPartitionInfo(frame, Seq("empname=ravi")) + assert(frame.count() == 10) + val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra" + sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""") + val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload") + verifyPartitionInfo(frame1, Seq("empname=indra")) + assert(frame1.count() == 20) + val file = FileFactory.getCarbonFile(location) + assert(file.exists()) + FileFactory.deleteAllCarbonFilesOfDir(file) } test("drop partition on preAggregate table should fail"){ @@ -295,7 +361,7 @@ test("Creation of partition table should fail if the colname in table schema and .asInstanceOf[CarbonScanRDD] } assert(scanRDD.nonEmpty) - assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.equals(f))).exists(!_)) + assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.getPartitions.contains(f))).exists(!_)) } override def afterAll = { @@ -318,6 +384,9 @@ test("Creation of partition table should fail if the colname in table schema and sql("drop table if exists badrecordsPartitionintnull") sql("drop table if exists badrecordsPartitionintnullalt") sql("drop table if exists partitionTable") + sql("drop table if exists staticpartitionlocload") + sql("drop table if exists staticpartitionextlocload") + sql("drop table if exists staticpartitionlocloadother") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java index be459ac..3670e11 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java @@ -31,19 +31,31 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri private static final long serialVersionUID = -4379212832935070583L; public Object convertToDecimal(Object data) { + if (null == data) { + return null; + } java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString()); return org.apache.spark.sql.types.Decimal.apply(javaDecVal); } public byte[] convertFromStringToByte(Object data) { + if (null == data) { + return null; + } return UTF8String.fromString((String) data).getBytes(); } public Object convertFromByteToUTF8String(Object data) { + if (null == data) { + return null; + } return UTF8String.fromBytes((byte[]) data); } public Object convertFromStringToUTF8String(Object data) { + if (null == data) { + return null; + } return UTF8String.fromString((String) data); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index c02ba0a..5fc7e3d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -29,8 +29,9 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMapFileStore} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -116,7 +117,7 @@ object CarbonStore { tablePath: String, carbonTable: CarbonTable, forceTableClean: Boolean, - currentTablePartitions: Option[Seq[String]] = None): Unit = { + currentTablePartitions: Option[Seq[PartitionSpec]] = None): Unit = { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null val absoluteTableIdentifier = if (forceTableClean) { @@ -139,11 +140,14 @@ object CarbonStore { CarbonLockUtil .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable) + isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) currentTablePartitions match { case Some(partitions) => - new PartitionMapFileStore().cleanSegments(carbonTable, partitions.asJava, true) + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) case _ => } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index 9ea58a9..07a2e57 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -23,10 +23,11 @@ import scala.collection.JavaConverters._ import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.carbondata.core.metadata.PartitionMapFileStore -import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.SegmentFileStore -case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String) +case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment) extends Partition { override val index: Int = idx @@ -35,115 +36,52 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String) } /** - * RDD to drop the partitions from partition mapper files of all segments. + * RDD to drop the partitions from segment files of all segments. * @param sc * @param tablePath - * @param segments segments to be merged - * @param partialMatch If it is true then even the partial partition spec matches also can be - * dropped + * @param segments segments to be cleaned */ class CarbonDropPartitionRDD( sc: SparkContext, tablePath: String, - segments: Seq[String], - partitions: Seq[String], - uniqueId: String, - partialMatch: Boolean) - extends CarbonRDD[String](sc, Nil) { + segments: Seq[Segment], + partitions: util.List[PartitionSpec], + uniqueId: String) + extends CarbonRDD[(String, String)](sc, Nil) { override def getPartitions: Array[Partition] = { segments.zipWithIndex.map {s => - CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) + CarbonDropPartition(id, s._2, s._1) }.toArray } - override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { - val iter = new Iterator[String] { + override def internalCompute( + theSplit: Partition, + context: TaskContext): Iterator[(String, String)] = { + val iter = new Iterator[(String, String)] { val split = theSplit.asInstanceOf[CarbonDropPartition] - logInfo("Dropping partition information from : " + split.segmentPath) - partitions.toList.asJava - val partitionList = new util.ArrayList[util.List[String]]() - partitionList.add(partitions.toList.asJava) - new PartitionMapFileStore().dropPartitions( - split.segmentPath, - partitionList, + logInfo("Dropping partition information from : " + split.segment) + val toBeDeletedSegments = new util.ArrayList[String]() + val toBeUpdateSegments = new util.ArrayList[String]() + new SegmentFileStore( + tablePath, + split.segment.getSegmentFileName).dropPartitions( + split.segment, + partitions, uniqueId, - partialMatch) + toBeDeletedSegments, + toBeUpdateSegments) - var havePair = false var finished = false override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = true - havePair = !finished - } !finished } - override def next(): String = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - "" + override def next(): (String, String) = { + finished = true + (toBeUpdateSegments.asScala.mkString(","), toBeDeletedSegments.asScala.mkString(",")) } - - } - iter - } - -} - -/** - * This RDD is used for committing the partitions which were removed in before step. It just removes - * old mapper files and related data files. - * @param sc - * @param tablePath - * @param segments segments to be merged - */ -class CarbonDropPartitionCommitRDD( - sc: SparkContext, - tablePath: String, - segments: Seq[String], - success: Boolean, - uniqueId: String, - partitions: Seq[String]) - extends CarbonRDD[String](sc, Nil) { - - override def getPartitions: Array[Partition] = { - segments.zipWithIndex.map {s => - CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) - }.toArray - } - - override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { - val iter = new Iterator[String] { - val split = theSplit.asInstanceOf[CarbonDropPartition] - logInfo("Commit partition information from : " + split.segmentPath) - - new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success, tablePath, - partitions.toList.asJava) - - var havePair = false - var finished = false - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = true - havePair = !finished - } - !finished - } - - override def next(): String = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - "" - } - } iter } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 0859f2e..e0dcffd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._ @@ -37,6 +38,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block._ +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.blocklet.DataFileFooter import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -54,7 +56,7 @@ import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.splits.TableSplit import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.MergeResult -import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl, Util} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, SparkDataTypeConverterImpl, Util} class CarbonMergerRDD[K, V]( sc: SparkContext, @@ -87,8 +89,8 @@ class CarbonMergerRDD[K, V]( } else { carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) } - val partitionNames = if (carbonTable.isHivePartitionTable) { - carbonSparkPartition.partitionNames.get.asJava + val partitionSpec = if (carbonTable.isHivePartitionTable) { + carbonSparkPartition.partitionSpec.get } else { null } @@ -138,6 +140,14 @@ class CarbonMergerRDD[K, V]( ) } carbonLoadModel.setSegmentId(mergeNumber) + + if(carbonTable.isHivePartitionTable) { + carbonLoadModel.setTaskNo( + CarbonScalaUtil.generateUniqueNumber( + theSplit.index, + mergeNumber.replace(".", ""), 0L)) + } + CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false) // get destination segment properties as sent from driver which is of last segment. @@ -198,7 +208,7 @@ class CarbonMergerRDD[K, V]( segmentProperties, carbonMergerMapping.campactionType, factTableName, - partitionNames) + partitionSpec) } else { LOGGER.info("RowResultMergerProcessor flow is selected") processor = @@ -209,7 +219,7 @@ class CarbonMergerRDD[K, V]( tempStoreLoc, carbonLoadModel, carbonMergerMapping.campactionType, - partitionNames) + partitionSpec) } mergeStatus = processor.execute(result2) mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber @@ -260,7 +270,7 @@ class CarbonMergerRDD[K, V]( val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) CarbonTableInputFormat.setPartitionsToPrune( job.getConfiguration, - carbonMergerMapping.currentPartitions.asJava) + carbonMergerMapping.currentPartitions.map(_.asJava).orNull) CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) var updateDetails: UpdateVO = null @@ -284,10 +294,10 @@ class CarbonMergerRDD[K, V]( for (eachSeg <- carbonMergerMapping.validSegments) { // map for keeping the relation of a task and its blocks. - job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) + job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo) if (updateStatusManager.getUpdateStatusDetails.length != 0) { - updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg) + updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo) } val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0 @@ -304,7 +314,7 @@ class CarbonMergerRDD[K, V]( val blockInfo = new TableBlockInfo(entry.getPath.toString, entry.getStart, entry.getSegmentId, entry.getLocations, entry.getLength, entry.getVersion, - updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString) + updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId) ) (!updated || (updated && (!CarbonUtil .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, @@ -329,7 +339,7 @@ class CarbonMergerRDD[K, V]( } val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]() - val partitionTaskMap = new util.HashMap[util.List[String], String]() + val partitionTaskMap = new util.HashMap[PartitionSpec, String]() val counter = new AtomicInteger() carbonInputSplits.foreach { split => val taskNo = getTaskNo(split, partitionTaskMap, counter) @@ -455,16 +465,20 @@ class CarbonMergerRDD[K, V]( private def getTaskNo( split: CarbonInputSplit, - partitionTaskMap: util.Map[List[String], String], + partitionTaskMap: util.Map[PartitionSpec, String], counter: AtomicInteger): String = { if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) { - val partitions = - carbonMergerMapping.partitionMapper.getPartitionMap.get( - CarbonTablePath.getCarbonIndexFileName(split.getBlockPath)) - var task = partitionTaskMap.get(partitions) + val path = split.getPath.getParent + val partTask = + carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match { + case Some(part) => part + case None => + throw new UnsupportedOperationException("Cannot do compaction on dropped partition") + } + var task = partitionTaskMap.get(partTask) if (task == null) { task = counter.incrementAndGet().toString - partitionTaskMap.put(partitions, task) + partitionTaskMap.put(partTask, task) } task } else { @@ -472,10 +486,12 @@ class CarbonMergerRDD[K, V]( } } + + private def getPartitionNamesFromTask(taskId: String, - partitionTaskMap: util.Map[List[String], String]): Option[Seq[String]] = { + partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = { if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) { - Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1.asScala) + Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1) } else { None } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 8c29c2a..772f702 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.Distributable +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression @@ -68,7 +69,7 @@ class CarbonScanRDD( @transient serializedTableInfo: Array[Byte], @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics, - @transient val partitionNames: Seq[String]) + @transient val partitionNames: Seq[PartitionSpec]) extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala index 036f1d1..b473d35 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd import org.apache.spark.{Partition, SerializableWritable} +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.hadoop.CarbonMultiBlockSplit class CarbonSparkPartition( @@ -26,7 +27,7 @@ class CarbonSparkPartition( val idx: Int, @transient val multiBlockSplit: CarbonMultiBlockSplit, val partitionId: Int = 0, - val partitionNames: Option[Seq[String]] = None) + val partitionSpec: Option[PartitionSpec] = None) extends Partition { val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala index 2aa5610..c73065d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala @@ -32,7 +32,7 @@ object PartitionDropper { def triggerPartitionDrop(dropPartitionCallableModel: DropPartitionCallableModel): Unit = { val alterPartitionModel = new AlterPartitionModel(dropPartitionCallableModel.carbonLoadModel, - dropPartitionCallableModel.segmentId, + dropPartitionCallableModel.segmentId.getSegmentNo, dropPartitionCallableModel.oldPartitionIds, dropPartitionCallableModel.sqlContext ) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 73be3c8..33263d6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -17,27 +17,35 @@ package org.apache.carbondata.spark.util +import java.{lang, util} import java.nio.charset.Charset import java.text.SimpleDateFormat -import java.util +import java.util.Date +import com.univocity.parsers.common.TextParsingException +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.command.DataTypeInfo +import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String -import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory +import org.apache.carbondata.core.metadata.ColumnIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema} -import org.apache.carbondata.core.util.{CarbonSessionInfo, DataTypeUtil} +import org.apache.carbondata.core.util.DataTypeUtil +import org.apache.carbondata.processing.exception.DataLoadingException +import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object CarbonScalaUtil { def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { @@ -167,33 +175,48 @@ object CarbonScalaUtil { * @param dataType Datatype to convert and then convert to String * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes * @param dateFormat DataFormat to convert in case of DateType datatype - * @param serializationNullFormat if this encounters in input data then data will - * be treated as null * @return converted String */ - def convertToString( + def convertToDateAndTimeFormats( value: String, dataType: DataType, timeStampFormat: SimpleDateFormat, - dateFormat: SimpleDateFormat, - serializationNullFormat: String): String = { - if (value == null || serializationNullFormat.equals(value)) { - return null - } - dataType match { - case TimestampType if timeStampFormat != null => - DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000) - case DateType if dateFormat != null => - DateTimeUtils.dateToString( - (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt) - case ShortType => value.toShort.toString - case IntegerType => value.toInt.toString - case LongType => value.toLong.toString - case DoubleType => value.toDouble.toString - case FloatType => value.toFloat.toString - case d: DecimalType => new java.math.BigDecimal(value).toPlainString - case BooleanType => value.toBoolean.toString - case _ => value + dateFormat: SimpleDateFormat): String = { + val defaultValue = value != null && value.equalsIgnoreCase(hivedefaultpartition) + try { + dataType match { + case TimestampType if timeStampFormat != null => + if (defaultValue) { + timeStampFormat.format(new Date()) + } else { + timeStampFormat.format(DateTimeUtils.stringToTime(value)) + } + case DateType if dateFormat != null => + if (defaultValue) { + dateFormat.format(new Date()) + } else { + dateFormat.format(DateTimeUtils.stringToTime(value)) + } + case _ => + val convertedValue = + DataTypeUtil + .getDataBasedOnDataType(value, convertSparkToCarbonDataType(dataType)) + if (convertedValue == null) { + if (defaultValue) { + return dataType match { + case BooleanType => "false" + case _ => "0" + } + } + throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") + } + value + } + } catch { + case e: Exception => + throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") } } @@ -214,14 +237,20 @@ object CarbonScalaUtil { val time = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( column.getDataType, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - ).getValueFromSurrogate(value.toInt).toString - return DateTimeUtils.timestampToString(time.toLong * 1000) + ).getValueFromSurrogate(value.toInt) + if (time == null) { + return null + } + return DateTimeUtils.timestampToString(time.toString.toLong * 1000) } else if (column.getDataType.equals(CarbonDataTypes.DATE)) { val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( column.getDataType, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - ).getValueFromSurrogate(value.toInt).toString - return DateTimeUtils.dateToString(date.toInt) + ).getValueFromSurrogate(value.toInt) + if (date == null) { + return null + } + return DateTimeUtils.dateToString(date.toString.toInt) } } val dictionaryPath = @@ -271,14 +300,28 @@ object CarbonScalaUtil { CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT ).generateDirectSurrogateKey(value).toString } + } else if (column.hasEncoding(Encoding.DICTIONARY)) { + val cacheProvider: CacheProvider = CacheProvider.getInstance + val reverseCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + cacheProvider.createCache(CacheType.REVERSE_DICTIONARY) + val dictionaryPath = + table.getTableInfo.getFactTable.getTableProperties.get( + CarbonCommonConstants.DICTIONARY_PATH) + val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier( + table.getAbsoluteTableIdentifier, + new ColumnIdentifier( + column.getColumnUniqueId, + column.getColumnProperties, + column.getDataType), + column.getDataType, + dictionaryPath) + return reverseCache.get(dictionaryColumnUniqueIdentifier).getSurrogateKey(value).toString } column.getDataType match { case CarbonDataTypes.TIMESTAMP => - DataTypeUtil.getDataDataTypeForNoDictionaryColumn(value, - column.getDataType, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT).toString + DateTimeUtils.stringToTime(value).getTime.toString case CarbonDataTypes.DATE => - DateTimeUtils.stringToDate(UTF8String.fromString(value)).get.toString + DateTimeUtils.stringToTime(value).getTime.toString case _ => value } } catch { @@ -287,7 +330,7 @@ object CarbonScalaUtil { } } - private val hiveignorepartition = "__HIVE_IGNORE_PARTITION__" + private val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__" /** * Update partition values as per the right date and time format @@ -295,10 +338,7 @@ object CarbonScalaUtil { */ def updatePartitions( partitionSpec: Map[String, String], - table: CarbonTable, - timeFormat: SimpleDateFormat, - dateFormat: SimpleDateFormat): Map[String, String] = { - val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__" + table: CarbonTable): Map[String, String] = { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) @@ -340,37 +380,14 @@ object CarbonScalaUtil { * Update partition values as per the right date and time format */ def updatePartitions( - carbonSessionInfo: CarbonSessionInfo, parts: Seq[CatalogTablePartition], table: CarbonTable): Seq[CatalogTablePartition] = { - val dateFormatStr = carbonSessionInfo.getThreadParams.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - val dateFormat = new SimpleDateFormat(dateFormatStr) - val timeFormatStr = carbonSessionInfo.getThreadParams.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT) - val timeFormat = new SimpleDateFormat(timeFormatStr) - val serializeFormat = carbonSessionInfo.getThreadParams.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT_DEFAULT) - val isEmptyBadRecord = carbonSessionInfo.getThreadParams.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT).toBoolean - val badRecordAction = carbonSessionInfo.getThreadParams.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - LoggerAction.FAIL.toString) parts.map{ f => val changedSpec = updatePartitions( f.spec, - table, - timeFormat, - dateFormat) + table) f.copy(spec = changedSpec) - }.filterNot{ p => - // Filter the special bad record ignore case string - p.spec.exists(_._2.equals(hiveignorepartition)) }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby } @@ -508,4 +525,64 @@ object CarbonScalaUtil { } } + /** + * Retrieve error message from exception + */ + def retrieveAndLogErrorMsg(ex: Throwable, logger: LogService): (String, String) = { + var errorMessage = "DataLoad failure" + var executorMessage = "" + if (ex != null) { + ex match { + case sparkException: SparkException => + if (sparkException.getCause.isInstanceOf[DataLoadingException] || + sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { + executorMessage = sparkException.getCause.getMessage + errorMessage = errorMessage + ": " + executorMessage + } else if (sparkException.getCause.isInstanceOf[TextParsingException]) { + executorMessage = CarbonDataProcessorUtil + .trimErrorMessage(sparkException.getCause.getMessage) + errorMessage = errorMessage + " : " + executorMessage + } else if (sparkException.getCause.isInstanceOf[SparkException]) { + val (executorMsgLocal, errorMsgLocal) = + retrieveAndLogErrorMsg(sparkException.getCause, logger) + executorMessage = executorMsgLocal + errorMessage = errorMsgLocal + } + case aex: AnalysisException => + logger.error(aex.getMessage()) + throw aex + case _ => + if (ex.getCause != null) { + executorMessage = ex.getCause.getMessage + errorMessage = errorMessage + ": " + executorMessage + } + } + } + (executorMessage, errorMessage) + } + + /** + * Update error inside update model + */ + def updateErrorInUpdateModel(updateModel: UpdateTableModel, executorMessage: String): Unit = { + if (updateModel.executorErrors.failureCauses == FailureCauses.NONE) { + updateModel.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + if (null != executorMessage && !executorMessage.isEmpty) { + updateModel.executorErrors.errorMsg = executorMessage + } else { + updateModel.executorErrors.errorMsg = "Update failed as the data load has failed." + } + } + } + + /** + * Generate unique number to be used as partition number of file name + */ + def generateUniqueNumber(taskId: Int, + segmentId: String, + partitionNumber: lang.Long): String = { + String.valueOf(Math.pow(10, 2).toInt + segmentId.toInt) + + String.valueOf(Math.pow(10, 5).toInt + taskId) + + String.valueOf(partitionNumber + Math.pow(10, 5).toInt) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 64e4bb1..94668bd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -683,7 +683,8 @@ object CommonUtil { def getCsvHeaderColumns( carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration): Array[String] = { + hadoopConf: Configuration, + staticPartitionCols: util.List[String] = new util.ArrayList[String]()): Array[String] = { val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { CarbonCommonConstants.COMMA } else { @@ -691,7 +692,7 @@ object CommonUtil { } var csvFile: String = null var csvHeader: String = carbonLoadModel.getCsvHeader - val csvColumns = if (StringUtils.isBlank(csvHeader)) { + var csvColumns = if (StringUtils.isBlank(csvHeader)) { // read header from csv file csvFile = carbonLoadModel.getFactFilePath.split(",")(0) csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf) @@ -704,7 +705,7 @@ object CommonUtil { } if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns, - carbonLoadModel.getCarbonDataLoadSchema)) { + carbonLoadModel.getCarbonDataLoadSchema, staticPartitionCols)) { if (csvFile == null) { LOGGER.error("CSV header in DDL is not proper." + " Column names in schema and CSV header are not the same.") @@ -720,7 +721,23 @@ object CommonUtil { + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile)) } } - csvColumns + // In case of static partition columns just change the name of header if already exists as + // we should not take the column from csv file and add them as new columns at the end. + if (staticPartitionCols.size() > 0) { + val scalaIgnoreColumns = staticPartitionCols.asScala + var updatedCols = csvColumns.map{col => + if (scalaIgnoreColumns.exists(_.equalsIgnoreCase(col))) { + col + "1" + } else { + col + } + }.toList.asJava + updatedCols = new util.ArrayList[String](updatedCols) + updatedCols.addAll(staticPartitionCols) + updatedCols.asScala.toArray + } else { + csvColumns + } } def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = { @@ -866,7 +883,7 @@ object CommonUtil { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable) + isForceDeletion = true, carbonTable, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index a38eaba..6767ef7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -18,7 +18,8 @@ package org.apache.carbondata.spark.util import java.text.SimpleDateFormat -import java.util.{Date, Locale} +import java.util +import java.util.{Date, List, Locale} import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ @@ -43,9 +44,11 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat @@ -221,7 +224,9 @@ object DataLoadingUtil { options: immutable.Map[String, String], optionsFinal: mutable.Map[String, String], carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration): Unit = { + hadoopConf: Configuration, + partition: Map[String, Option[String]] = Map.empty, + isDataFrame: Boolean = false): Unit = { carbonLoadModel.setTableName(table.getTableName) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTablePath(table.getTablePath) @@ -331,8 +336,13 @@ object DataLoadingUtil { carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) carbonLoadModel.setCsvHeader(fileHeader) carbonLoadModel.setColDictFilePath(column_dict) + + val ignoreColumns = new util.ArrayList[String]() + if (!isDataFrame) { + ignoreColumns.addAll(partition.filter(_._2.isDefined).keys.toList.asJava) + } carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf)) + CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns)) val validatedMaxColumns = CommonUtil.validateMaxColumns( carbonLoadModel.getCsvHeaderColumns, @@ -360,33 +370,40 @@ object DataLoadingUtil { def deleteLoadsAndUpdateMetadata( isForceDeletion: Boolean, - carbonTable: CarbonTable): Unit = { + carbonTable: CarbonTable, + specs: util.List[PartitionSpec]): Unit = { if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbonTableStatusLock = - CarbonLockFactory.getCarbonLockObj( - absoluteTableIdentifier, - LockUsage.TABLE_STATUS_LOCK - ) - - // Delete marked loads - val isUpdationRequired = - DeleteLoadFolders.deleteLoadFoldersFromFileSystem( - absoluteTableIdentifier, - isForceDeletion, - details, - carbonTable.getMetaDataFilepath - ) - var updationCompletionStaus = false - - if (isUpdationRequired) { + val (details, updationRequired) = + isUpdationRequired( + isForceDeletion, + carbonTable, + absoluteTableIdentifier) + + + if (updationRequired) { + val carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj( + absoluteTableIdentifier, + LockUsage.TABLE_STATUS_LOCK + ) + var locked = false + var updationCompletionStaus = false try { // Update load metadate file after cleaning deleted nodes - if (carbonTableStatusLock.lockWithRetries()) { + locked = carbonTableStatusLock.lockWithRetries() + if (locked) { LOGGER.info("Table status lock has been successfully acquired.") - + // Again read status and check to verify updation required or not. + val (details, updationRequired) = + isUpdationRequired( + isForceDeletion, + carbonTable, + absoluteTableIdentifier) + if (!updationRequired) { + return + } // read latest table status again. val latestMetadata = SegmentStatusManager .readLoadMetadata(carbonTable.getMetaDataFilepath) @@ -409,17 +426,34 @@ object DataLoadingUtil { } updationCompletionStaus = true } finally { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) + if (locked) { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) + } } if (updationCompletionStaus) { DeleteLoadFolders .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier, - carbonTable.getMetaDataFilepath, isForceDeletion) + carbonTable.getMetaDataFilepath, isForceDeletion, specs) } } } } + private def isUpdationRequired(isForceDeletion: Boolean, + carbonTable: CarbonTable, + absoluteTableIdentifier: AbsoluteTableIdentifier) = { + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) + // Delete marked loads + val isUpdationRequired = + DeleteLoadFolders.deleteLoadFoldersFromFileSystem( + absoluteTableIdentifier, + isForceDeletion, + details, + carbonTable.getMetaDataFilepath + ) + (details, isUpdationRequired) + } + /** * creates a RDD that does reading of multiple CSV files */