http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index 99b536c..b4937e6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -17,12 +17,15 @@ package org.apache.carbondata.spark.testsuite.datacompaction +import scala.collection.JavaConverters._ + import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter @@ -61,7 +64,7 @@ class CarbonIndexFileMergeTestCase """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") - val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge") new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) assert(getIndexFileCount("default_indexmerge", "0") == 0) @@ -84,7 +87,7 @@ class CarbonIndexFileMergeTestCase val rows = sql("""Select count(*) from nonindexmerge""").collect() assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) - val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) new CarbonIndexFileMergeWriter(table) @@ -109,7 +112,7 @@ class CarbonIndexFileMergeTestCase val rows = sql("""Select count(*) from nonindexmerge""").collect() assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) - val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) new CarbonIndexFileMergeWriter(table) @@ -138,7 +141,7 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "1") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() - val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) @@ -167,7 +170,7 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "2") == 100) assert(getIndexFileCount("default_nonindexmerge", "3") == 100) sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() - val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 100) @@ -190,18 +193,32 @@ class CarbonIndexFileMergeTestCase sql("select * from mitable").show() } - private def getIndexFileCount(tableName: String, segment: String): Int = { - val table = CarbonMetadata.getInstance().getCarbonTable(tableName) - val path = CarbonTablePath - .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment) - val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath - .INDEX_FILE_EXT) - }) - if (carbonFiles != null) { - carbonFiles.length + private def getIndexFileCount(tableName: String, segmentNo: String): Int = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) + if (FileFactory.isFileExist(segmentDir)) { + val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir) + indexFiles.asScala.map { f => + if (f._2 == null) { + 1 + } else { + 0 + } + }.sum } else { - 0 + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + if (segment != null) { + val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) + store.getSegmentFile.getLocationMap.values().asScala.map { f => + if (f.getMergeFileName == null) { + f.getFiles.size() + } else { + 0 + } + }.sum + } else { + 0 + } } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala index 8f891ce..d49b962 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala @@ -22,9 +22,12 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { val filePath: String = s"$resourcesPath/globalsort" @@ -527,8 +530,12 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) - val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" + - segmentNo - new SegmentIndexFileStore().getIndexFilesFromSegment(store).size() + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) + if (FileFactory.isFileExist(segmentDir)) { + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + } else { + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size() + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala index 2da1ada..54c19c2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala @@ -23,8 +23,11 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.util.path.CarbonTablePath class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { val filePath: String = s"$resourcesPath/globalsort" @@ -567,8 +570,12 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) - val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" + - segmentNo - new SegmentIndexFileStore().getIndexFilesFromSegment(store).size() + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) + if (FileFactory.isFileExist(segmentDir)) { + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + } else { + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size() + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala index f3e12d1..c695b05 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -26,8 +26,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonTablePath @@ -189,12 +191,14 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { } def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = { - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - CarbonCommonConstants.DATABASE_DEFAULT_NAME, - tableName - ) - val segmentDir = carbonTable.getSegmentPath(segmentNo) - new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) + if (FileFactory.isFileExist(segmentDir)) { + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + } else { + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size() + } } override def afterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala index b9d8e12..39785a3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.testsuite.dataload +import scala.collection.JavaConverters._ + import java.io.{File, FilenameFilter} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -26,7 +28,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { var originVersion = "" @@ -49,12 +53,20 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { val indexReader = new CarbonIndexFileReader() val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3") val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") - val carbonIndexPaths = new File(segmentDir) - .listFiles(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - name.endsWith(CarbonTablePath.getCarbonIndexExtension) - } - }) + + val carbonIndexPaths = if (FileFactory.isFileExist(segmentDir)) { + new File(segmentDir) + .listFiles(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.endsWith(CarbonTablePath.getCarbonIndexExtension) + } + }) + } else { + val segment = Segment.getSegment("0", carbonTable.getTablePath) + val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) + store.readIndexFiles() + store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray + } for (carbonIndexPath <- carbonIndexPaths) { indexReader.openThriftReader(carbonIndexPath.getCanonicalPath) assert(indexReader.readIndexHeader().getVersion === 3) http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index bba75ad..d7b1172 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.testsuite.dataload +import scala.collection.JavaConverters._ + import java.io.{File, FileWriter} import org.apache.commons.io.FileUtils @@ -31,8 +33,10 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.core.util.path.CarbonTablePath @@ -273,7 +277,15 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") - assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) + if (FileFactory.isFileExist(segmentDir)) { + assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) + } else { + val segment = Segment.getSegment("0", carbonTable.getTablePath) + val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) + store.readIndexFiles() + val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum + assertResult(Math.max(7, defaultParallelism) + 1)(size + store.getIndexFilesMap.size()) + } } test("Query with small files") { @@ -379,6 +391,11 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) - new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + if (FileFactory.isFileExist(segmentDir)) { + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + } else { + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size() + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index b5c3df1..074c807 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -64,8 +64,9 @@ class CGDataMapFactory( * Get the datamap for segmentid */ override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = { - val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) - val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) + val path = identifier.getTablePath + val file = FileFactory.getCarbonFile( + path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo) val files = file.listFiles() files.map {f => @@ -100,8 +101,9 @@ class CGDataMapFactory( * @return */ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { - val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) - val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) + val path = identifier.getTablePath + val file = FileFactory.getCarbonFile( + path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo) val files = file.listFiles() files.map { f => http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index 2d666c3..08d8911 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -89,8 +89,9 @@ class FGDataMapFactory(carbonTable: CarbonTable, * @return */ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { - val path = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segment.getSegmentNo) - val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) + val path = carbonTable.getTablePath + val file = FileFactory.getCarbonFile( + path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo) val files = file.listFiles() files.map { f => @@ -416,7 +417,6 @@ class FGDataMapWriter(carbonTable: CarbonTable, stream.write(bytes) stream.writeInt(bytes.length) stream.close() -// commitFile(fgwritepath) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index eaa2ae7..642607c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.testsuite.datamap +import scala.collection.JavaConverters._ + import java.io.{File, FilenameFilter} import org.apache.spark.sql.Row @@ -26,7 +28,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -261,12 +265,21 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { | group by name """.stripMargin) assertResult(true)(new File(path).exists()) - assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") - .list(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - name.contains(CarbonCommonConstants.FACT_FILE_EXT) - } - }).length > 0) + if (FileFactory.isFileExist(CarbonTablePath.getSegmentPath(path, "0"))) { + assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) + } else { + val segment = Segment.getSegment("0", path) + val store = new SegmentFileStore(path, segment.getSegmentFileName) + store.readIndexFiles() + val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum + assertResult(true)(size > 0) + } + checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) sql("drop datamap preagg on table main") http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala new file mode 100644 index 0000000..d786d10 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.flatfolder + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll { + override def beforeAll { + dropTable + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + } + + def validateDataFiles(tableUniqueName: String, segmentId: String): Unit = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val files = FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles() + assert(files.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))) + } + + test("data loading for flat folder with global sort") { + sql( + """ + | CREATE TABLE flatfolder_gs (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,empno int) + | STORED BY 'org.apache.carbondata.format' tblproperties('sort_scope'='global_sort', 'flat_folder'='true') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_gs OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + validateDataFiles("default_flatfolder_gs", "0") + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder_gs order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + } + + test("data loading for flat folder") { + sql( + """ + | CREATE TABLE flatfolder (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,empno int) + | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + validateDataFiles("default_flatfolder", "0") + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + } + + test("data loading for flat folder pre-agg") { + sql( + """ + | CREATE TABLE flatfolder_preagg (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,empno int) + | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true') + """.stripMargin) + sql("create datamap p2 on table flatfolder_preagg using 'preaggregate' as select empname, designation, min(salary) from flatfolder_preagg group by empname, designation ") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_preagg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + validateDataFiles("default_flatfolder_preagg", "0") + validateDataFiles("default_flatfolder_preagg_p2", "0") + + checkAnswer(sql("select empname, designation, min(salary) from flatfolder_preagg group by empname, designation"), + sql("select empname, designation, min(salary) from originTable group by empname, designation")) + + } + + override def afterAll = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + dropTable + } + + def dropTable = { + sql("drop table if exists originTable") + sql("drop table if exists flatfolder") + sql("drop table if exists flatfolder_gs") + sql("drop table if exists flatfolder_preagg") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index ec39f66..2432715 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -26,6 +26,9 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.path.CarbonTablePath + class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -689,7 +692,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.FILE_SEPARATOR + "t" + CarbonCommonConstants.FILE_SEPARATOR + "Fact" + CarbonCommonConstants.FILE_SEPARATOR + "Part0") - assert(f.list().length == 2) + if (!FileFactory.isFileExist( + CarbonTablePath.getSegmentFilesLocation( + dblocation + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.FILE_SEPARATOR + "t"))) { + assert(f.list().length == 2) + } } test("test sentences func in update statement") { sql("drop table if exists senten") http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index 0eaaec5..133454a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -16,17 +16,22 @@ */ package org.apache.carbondata.spark.testsuite.partition +import scala.collection.JavaConverters._ + import org.apache.spark.sql.Row import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.core.datamap.Segment + class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -62,12 +67,20 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val segmentDir = carbonTable.getSegmentPath(segmentId) - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".carbondata") - } - }) + + val dataFiles = if (FileFactory.isFileExist(segmentDir)) { + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".carbondata") + } + }) + } else { + val segment = Segment.getSegment(segmentId, carbonTable.getTablePath) + val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) + store.readIndexFiles() + store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray + } assert(dataFiles.size == partitions.size) http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 0422239..f443214 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -41,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} -import org.apache.carbondata.spark.util.Util +import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} object DataLoadProcessorStepOnSpark { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 2ba6e5e..3aaf0ae 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -28,6 +28,7 @@ import org.apache.spark.{Partition, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.execution.command.CarbonMergerMapping +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} @@ -74,7 +75,8 @@ class CarbonIUDMergerRDD[K, V]( val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) // group blocks by segment. - val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId) + val splitsGroupedMySegment = + carbonInputSplits.groupBy(_.getSegmentId) var i = -1 http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index d29284f..2fca57e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block._ import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} @@ -133,7 +134,7 @@ class CarbonMergerRDD[K, V]( .toList } mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) { - tableBlockInfoList.get(0).getSegmentId + tableBlockInfoList.get(0).getSegment.toString } else { mergedLoadName.substring( mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + @@ -326,7 +327,9 @@ class CarbonMergerRDD[K, V]( val blockInfo = new TableBlockInfo(entry.getPath.toString, entry.getStart, entry.getSegmentId, entry.getLocations, entry.getLength, entry.getVersion, - updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId) + updateStatusManager.getDeleteDeltaFilePath( + entry.getPath.toString, + Segment.toSegment(entry.getSegmentId).getSegmentNo) ) (!updated || (updated && (!CarbonUtil .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 77ff139..3995aa7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -530,6 +530,23 @@ object CommonUtil { } /** + * This method will validate the flat folder property specified by the user + * + * @param tableProperties + */ + def validateFlatFolder(tableProperties: Map[String, String]): Unit = { + val tblPropName = CarbonCommonConstants.FLAT_FOLDER + if (tableProperties.get(tblPropName).isDefined) { + val trimStr = tableProperties(tblPropName).trim + if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) { + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$trimStr, only true|false is supported.") + } + tableProperties.put(tblPropName, trimStr) + } + } + + /** * This method will validate the compaction level threshold property specified by the user * the property is used while doing minor compaction * http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 61a5b42..7d28790 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -301,6 +301,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { CommonUtil.validateTableBlockSize(tableProperties) // validate table level properties for compaction CommonUtil.validateTableLevelCompactionProperties(tableProperties) + // validate flat folder property. + CommonUtil.validateFlatFolder(tableProperties) TableModel( ifNotExistPresent, http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 84d9c47..fdbf400 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -25,18 +25,23 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo} +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit @@ -192,9 +197,13 @@ object PartitionUtils { val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo) val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo) val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path) - val indexFilePath = CarbonTablePath.getCarbonIndexFilePath( - tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber), - timestamp, version) + val indexFilePath = + new Path(new Path(path).getParent, + CarbonTablePath.getCarbonIndexFileName(taskId, + bucketNumber.toInt, + batchNo, + timestamp, + segmentId)).toString // indexFilePath could be duplicated when multiple data file related to one index file if (indexFilePath != null && !pathList.contains(indexFilePath)) { pathList.add(indexFilePath) @@ -209,11 +218,13 @@ object PartitionUtils { CarbonUtil.deleteFiles(files.asScala.toArray) if (!files.isEmpty) { val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val file = SegmentFileStore.writeSegmentFile( - identifier.getTablePath, - alterPartitionModel.segmentId, - alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString) - val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null)) + val updatedSegFile: String = mergeAndUpdateSegmentFile(alterPartitionModel, + identifier, + segmentId, + carbonTable, + files.asScala) + + val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, updatedSegFile, null)) .asJava if (!CarbonUpdateUtil.updateTableMetadataStatus( new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, @@ -283,4 +294,50 @@ object PartitionUtils { generatePartitionerField(allPartitionColumn.toList, Seq.empty) } + + private def mergeAndUpdateSegmentFile(alterPartitionModel: AlterPartitionModel, + identifier: AbsoluteTableIdentifier, + segmentId: String, + carbonTable: CarbonTable, filesToBeDelete: Seq[File]) = { + val metadataDetails = + SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) + val segmentFile = + metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile + var allSegmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile] + val file = SegmentFileStore.writeSegmentFile( + carbonTable, + alterPartitionModel.segmentId, + System.currentTimeMillis().toString) + if (segmentFile != null) { + allSegmentFiles ++= FileFactory.getCarbonFile( + SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil + } + val updatedSegFile = { + val carbonFile = FileFactory.getCarbonFile( + SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file)) + allSegmentFiles ++= carbonFile :: Nil + + val mergedSegFileName = SegmentFileStore.genSegmentFileName( + segmentId, + alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString) + val tmpFile = mergedSegFileName + "_tmp" + val segmentStoreFile = SegmentFileStore.mergeSegmentFiles( + tmpFile, + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath), + allSegmentFiles.toArray) + val indexFiles = segmentStoreFile.getLocationMap.values().asScala.head.getFiles + filesToBeDelete.foreach(f => indexFiles.remove(f.getName)) + SegmentFileStore.writeSegmentFile( + segmentStoreFile, + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + mergedSegFileName + CarbonTablePath.SEGMENT_EXT) + carbonFile.delete() + FileFactory.getCarbonFile( + SegmentFileStore.getSegmentFilePath( + carbonTable.getTablePath, tmpFile + CarbonTablePath.SEGMENT_EXT)).delete() + mergedSegFileName + CarbonTablePath.SEGMENT_EXT + } + updatedSegFile + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 5902783..f3f2650 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -84,9 +84,7 @@ object IndexDataMapRebuildRDD { segmentId: String): Unit = { val dataMapStorePath = - CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + - File.separator + - dataMapName + CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segmentId, dataMapName) if (!FileFactory.isFileExist(dataMapStorePath)) { if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 21a8641..5d53ccc 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -50,6 +50,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore} @@ -434,13 +436,7 @@ object CarbonDataRDDFactory { segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null)) } } - val segmentFiles = segmentDetails.asScala.map{seg => - val file = SegmentFileStore.writeSegmentFile( - carbonTable.getTablePath, - seg.getSegmentNo, - updateModel.get.updatedTimeStamp.toString) - new Segment(seg.getSegmentNo, file) - }.filter(_.getSegmentFileName != null).asJava + val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get) // this means that the update doesnt have any records to update so no need to do table // status file updation. @@ -517,9 +513,13 @@ object CarbonDataRDDFactory { writeDictionary(carbonLoadModel, result, writeAll = false) val segmentFileName = - SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId, + SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, String.valueOf(carbonLoadModel.getFactTimeStamp)) + SegmentFileStore.updateSegmentFile( + carbonTable.getTablePath, + carbonLoadModel.getSegmentId, + segmentFileName) operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", carbonLoadModel.getSegmentId) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = @@ -588,6 +588,58 @@ object CarbonDataRDDFactory { } /** + * Add and update the segment files. In case of update scenario the carbonindex files are written + * to the same segment so we need to update old segment file. So this ethod writes the latest data + * to new segment file and merges this file old file to get latest updated files. + * @param carbonTable + * @param segmentDetails + * @return + */ + private def updateSegmentFiles( + carbonTable: CarbonTable, + segmentDetails: util.HashSet[Segment], + updateModel: UpdateTableModel) = { + val metadataDetails = + SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) + val segmentFiles = segmentDetails.asScala.map { seg => + val segmentFile = + metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile + var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile] + + val file = SegmentFileStore.writeSegmentFile( + carbonTable, + seg.getSegmentNo, + String.valueOf(System.currentTimeMillis())) + + if (segmentFile != null) { + segmentFiles ++= FileFactory.getCarbonFile( + SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil + } + val updatedSegFile = if (file != null) { + val carbonFile = FileFactory.getCarbonFile( + SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file)) + segmentFiles ++= carbonFile :: Nil + + val mergedSegFileName = SegmentFileStore.genSegmentFileName( + seg.getSegmentNo, + updateModel.updatedTimeStamp.toString) + SegmentFileStore.mergeSegmentFiles( + mergedSegFileName, + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath), + segmentFiles.toArray) + carbonFile.delete() + mergedSegFileName + CarbonTablePath.SEGMENT_EXT + } else { + null + } + + new Segment(seg.getSegmentNo, updatedSegFile) + }.filter(_.getSegmentFileName != null).asJava + segmentFiles + } + + /** * If data load is triggered by UPDATE query, this func will execute the update * TODO: move it to a separate update command */ @@ -614,10 +666,11 @@ object CarbonDataRDDFactory { carbonTable.getMetadataPath) .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) || lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS)) - val segmentIds = loadMetadataDetails.map(_.getLoadName) - val segmentIdIndex = segmentIds.zipWithIndex.toMap - val segmentId2maxTaskNo = segmentIds.map { segId => - (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath)) + val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile)) + val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap + val segmentId2maxTaskNo = segments.map { seg => + (seg.getSegmentNo, + CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath)) }.toMap class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) @@ -639,10 +692,14 @@ object CarbonDataRDDFactory { val partitionId = TaskContext.getPartitionId() val segIdIndex = partitionId / segmentUpdateParallelism val randomPart = partitionId - segIdIndex * segmentUpdateParallelism - val segId = segmentIds(segIdIndex) - val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1 - List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition) - .toList).toIterator + val segId = segments(segIdIndex) + val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1 + List(triggerDataLoadForSegment( + carbonLoadModel, + updateModel, + segId.getSegmentNo, + newTaskNo, + partition).toList).toIterator }.collect() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 155bdd1..7605b9d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -223,7 +223,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { val segmentFilesList = loadsToMerge.asScala.map{seg => val file = SegmentFileStore.writeSegmentFile( - carbonTable.getTablePath, + carbonTable, seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString) new Segment(seg.getLoadName, file) @@ -231,7 +231,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList) } else { segmentFileName = SegmentFileStore.writeSegmentFile( - carbonTable.getTablePath, + carbonTable, mergedLoadNumber, carbonLoadModel.getFactTimeStamp.toString) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 93c0b4a..30cb464 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -99,7 +99,7 @@ class CarbonSession(@transient val sc: SparkContext, trySearchMode(qe, sse) } catch { case e: Exception => - logError(String.format( + log.error(String.format( "Exception when executing search mode: %s", e.getMessage)) throw e; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 0c6d2ba..127e1b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} import org.apache.carbondata.core.mutate.data.RowCountDetailsVO -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager} +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} @@ -68,12 +68,7 @@ object DeleteExecution { val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val isPartitionTable = carbonTable.isHivePartitionTable - val factPath = if (isPartitionTable) { - absoluteTableIdentifier.getTablePath - } else { - CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath) - } + val tablePath = absoluteTableIdentifier.getTablePath var segmentsTobeDeleted = Seq.empty[Segment] val deleteRdd = if (isUpdateOperation) { @@ -114,6 +109,9 @@ object DeleteExecution { CarbonUpdateUtil .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) + val metadataDetails = SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) + val rowContRdd = sparkSession.sparkContext.parallelize( blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, @@ -127,12 +125,16 @@ object DeleteExecution { var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]() while (records.hasNext) { val ((key), (rowCountDetailsVO, groupedRows)) = records.next + val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR)) + val segmentFile = + metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile result = result ++ deleteDeltaFunc(index, key, groupedRows.toIterator, timestamp, - rowCountDetailsVO) + rowCountDetailsVO, + segmentFile) } result } @@ -219,7 +221,8 @@ object DeleteExecution { key: String, iter: Iterator[Row], timestamp: String, - rowCountDetailsVO: RowCountDetailsVO + rowCountDetailsVO: RowCountDetailsVO, + segmentFile: String ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = { val result = new DeleteDelataResultImpl() @@ -255,7 +258,7 @@ object DeleteExecution { countOfRows = countOfRows + 1 } - val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable) + val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, segmentFile != null) val completeBlockName = CarbonTablePath .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) + CarbonCommonConstants.FACT_FILE_EXT) http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala index f3b4be7..857cd81 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -103,6 +103,9 @@ case class PreAggregateTableHelper( .LOAD_SORT_SCOPE_DEFAULT)) tableProperties .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) + tableProperties.put(CarbonCommonConstants.FLAT_FOLDER, + parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse( + CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER)) val tableIdentifier = TableIdentifier(parentTable.getTableName + "_" + dataMapName, Some(parentTable.getDatabaseName)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 7d15cc1..617f5e8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -138,6 +138,11 @@ private[sql] case class CarbonDescribeFormattedCommand( tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS), CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)) } + if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) { + results ++= Seq((CarbonCommonConstants.FLAT_FOLDER.toUpperCase, + tblProps.get(CarbonCommonConstants.FLAT_FOLDER), + CarbonCommonConstants.DEFAULT_FLAT_FOLDER)) + } results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) if (colPropStr.length() > 0) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index 0bdef8a..7cee409 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -20,13 +20,15 @@ package org.apache.carbondata.spark.testsuite.partition import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import org.apache.hadoop.fs.Path import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -855,15 +857,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { validatePartitionTableFiles(partitions, dataFiles) } - def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { - val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) - val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) - val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { - override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".carbondata") - } - }) - dataFiles + def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] = { + val segment = Segment.getSegment(segmentId, carbonTable.getTablePath) + if (segment.getSegmentFileName != null) { + val sfs = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) + sfs.readIndexFiles() + val indexFilesMap = sfs.getIndexFilesMap + val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new Path(f).getName) + dataFiles.toArray + } else { + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".carbondata") + } + }) + dataFiles.map(_.getName) + } } /** @@ -871,10 +882,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { * @param partitions * @param dataFiles */ - def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = { + def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[String]): Unit = { val partitionIds: ListBuffer[Int] = new ListBuffer[Int]() dataFiles.foreach { dataFile => - val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt + val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile).split("_")(0).toInt partitionIds += partitionId assert(partitions.contains(partitionId)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala index 48733dc..1c7cb10 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala @@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA assertResult(2)(result.length) assertResult("table_info1")(result(0).getString(0)) // 2096 is the size of carbon table - assertResult(2096)(result(0).getLong(1)) + assertResult(2098)(result(0).getLong(1)) assertResult("table_info2")(result(1).getString(0)) - assertResult(2096)(result(1).getLong(1)) + assertResult(2098)(result(1).getLong(1)) } override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 3dc34d3..bfa498e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -165,6 +165,7 @@ public class DataMapWriterListener { writer.finish(); } } + registry.clear(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java index eb02ede..fcabef5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java @@ -170,7 +170,8 @@ public abstract class AbstractDataLoadProcessorStep { carbonDataFileAttributes.getTaskId(), bucketId, 0, - String.valueOf(carbonDataFileAttributes.getFactTimeStamp()))); + String.valueOf(carbonDataFileAttributes.getFactTimeStamp()), + configuration.getSegmentId())); return listener; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index bc28ace..5bed8b1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -64,7 +64,7 @@ public class TableProcessingOperations { CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile path) { String segmentId = - CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); + CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); boolean found = false; for (int j = 0; j < details.length; j++) { if (details[j].getLoadName().equals(segmentId)) { @@ -76,8 +76,8 @@ public class TableProcessingOperations { } }); for (int k = 0; k < listFiles.length; k++) { - String segmentId = - CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); if (isCompactionFlow) { if (segmentId.contains(".")) { CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 90c297e..4d3f3fc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; @@ -83,7 +84,7 @@ public class CarbonLoadModel implements Serializable { /** * load Id */ - private String segmentId; + private Segment segment; private String allDictPath; @@ -424,7 +425,7 @@ public class CarbonLoadModel implements Serializable { copy.blocksID = blocksID; copy.taskNo = taskNo; copy.factTimeStamp = factTimeStamp; - copy.segmentId = segmentId; + copy.segment = segment; copy.serializationNullFormat = serializationNullFormat; copy.badRecordsLoggerEnable = badRecordsLoggerEnable; copy.badRecordsAction = badRecordsAction; @@ -479,7 +480,7 @@ public class CarbonLoadModel implements Serializable { copyObj.blocksID = blocksID; copyObj.taskNo = taskNo; copyObj.factTimeStamp = factTimeStamp; - copyObj.segmentId = segmentId; + copyObj.segment = segment; copyObj.serializationNullFormat = serializationNullFormat; copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable; copyObj.badRecordsAction = badRecordsAction; @@ -609,14 +610,24 @@ public class CarbonLoadModel implements Serializable { * @return load Id */ public String getSegmentId() { - return segmentId; + if (segment != null) { + return segment.getSegmentNo(); + } else { + return null; + } } /** * @param segmentId */ public void setSegmentId(String segmentId) { - this.segmentId = segmentId; + if (segmentId != null) { + this.segment = Segment.toSegment(segmentId); + } + } + + public Segment getSegment() { + return segment; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java index 7a11c8b..b22599d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -17,6 +17,7 @@ package org.apache.carbondata.processing.merger; +import java.io.IOException; import java.util.List; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; @@ -46,10 +47,11 @@ public abstract class AbstractResultProcessor { public abstract void close(); protected void setDataFileAttributesInModel(CarbonLoadModel loadModel, - CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) + throws IOException { CarbonDataFileAttributes carbonDataFileAttributes; if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { - long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), + long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegment(), loadModel.getTablePath()); // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will // be written in same segment. So the TaskNo should be incremented by 1 from max val. http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index fef8ab9..dde18a9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -175,7 +175,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "", partitionSpec.getPartitions()); } catch (IOException e) { - isCompactionSuccess = false; throw e; } } @@ -428,6 +427,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, tempStoreLocation, carbonStoreLocation); + carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId()); setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 9a3258e..b877d52 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -63,7 +63,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { public RowResultMergerProcessor(String databaseName, String tableName, SegmentProperties segProp, String[] tempStoreLocation, - CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) { + CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) + throws IOException { this.segprop = segProp; this.partitionSpec = partitionSpec; this.loadModel = loadModel; @@ -84,6 +85,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { tempStoreLocation, carbonStoreLocation); setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); + carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId()); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 221697f..9b09269 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -59,6 +59,7 @@ public class RowResultProcessor { carbonFactDataHandlerModel.setBucketId(bucketId); //Note: set compaction flow just to convert decimal type carbonFactDataHandlerModel.setCompactionFlow(true); + carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId()); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 87a6de0..27249ab 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -267,7 +267,8 @@ public class CarbonFactDataHandlerModel { carbonDataFileAttributes.getTaskId(), bucketId, 0, - String.valueOf(carbonDataFileAttributes.getFactTimeStamp()))); + String.valueOf(carbonDataFileAttributes.getFactTimeStamp()), + configuration.getSegmentId())); } carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); @@ -337,7 +338,8 @@ public class CarbonFactDataHandlerModel { CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()), carbonFactDataHandlerModel.getBucketId(), carbonFactDataHandlerModel.getTaskExtension(), - String.valueOf(loadModel.getFactTimeStamp()))); + String.valueOf(loadModel.getFactTimeStamp()), + loadModel.getSegmentId())); carbonFactDataHandlerModel.dataMapWriterlistener = listener; return carbonFactDataHandlerModel;