http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala index 5eb274d..86fd240 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala @@ -18,18 +18,6 @@ package org.apache.carbondata.cluster.sdv.generated -import org.apache.spark.sql.CarbonEnv -import org.apache.spark.sql.common.util._ -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath - /** * Test Class for AlterTableTestCase to verify all scenerios */ @@ -52,7 +40,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect - assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) + assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2) CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("DROP TABLE IF EXISTS carbon_automation_merge") @@ -75,8 +63,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect() - assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) - assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2) + assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2) + assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") <= 2) CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index b3bf93d..0d33797 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES @@ -272,7 +272,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 22: using invalid datamap provider") { sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") - val e: Exception = intercept[Exception] { + val e = intercept[MalformedDataMapCommandException] { sql( """ | CREATE DATAMAP agg0 ON TABLE mainTable @@ -282,8 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { | GROUP BY column3,column5,column2 """.stripMargin) } - assert(e.getMessage.contains( - s"Unknown data map type abc")) + assert(e.getMessage.contains("DataMap class 'abc' not found")) sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index 97aa056..49cabea 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { | GROUP BY dataTime """.stripMargin) } - assert(e.getMessage.equals("Unknown data map type abc")) + assert(e.getMessage.equals("DataMap class 'abc' not found")) } test("test timeseries create table: USING and catch MalformedCarbonCommandException") { @@ -215,10 +215,11 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { | GROUP BY dataTime """.stripMargin) } - assert(e.getMessage.equals("Unknown data map type abc")) + assert(e.getMessage.equals("DataMap class 'abc' not found")) } test("test timeseries create table: Only one granularity level can be defined 1") { + sql("drop datamap if exists agg0_second on table mainTable") val e: Exception = intercept[MalformedCarbonCommandException] { sql( s""" @@ -235,6 +236,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { | GROUP BY dataTime """.stripMargin) } + e.printStackTrace() assert(e.getMessage.equals("Only one granularity level can be defined")) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala index c522c1e..e31896f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala @@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest { try { val write = new PrintWriter(fileName); for (i <- start until (start + line)) { - write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80)) + write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80)) } write.close() } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index 6f03493..47ef192 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -86,18 +86,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { buildTestData } -test("test the boolean data type"){ - booldf.write - .format("carbondata") - .option("tableName", "carbon10") - .option("tempCSV", "true") - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() - checkAnswer( - sql("SELECT * FROM CARBON10"), - Seq(Row("anubhav", true), Row("prince", false))) -} + test("test the boolean data type"){ + booldf.write + .format("carbondata") + .option("tableName", "carbon0") + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("SELECT * FROM CARBON0"), + Seq(Row("anubhav", true), Row("prince", false))) + } test("test load dataframe with saving compressed csv files") { // save dataframe to carbon file http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 4b6f231..d4c49d2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.indexstore.Blocklet import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression @@ -48,30 +49,28 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { var identifier: AbsoluteTableIdentifier = _ - var dataMapName: String = _ + var dataMapSchema: DataMapSchema = _ /** * Initialization of Datamap factory with the identifier and datamap name */ - override def init(identifier: AbsoluteTableIdentifier, - dataMapName: String): Unit = { + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { this.identifier = identifier - this.dataMapName = dataMapName + this.dataMapSchema = dataMapSchema } /** * Return a new write for this datamap */ override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) } /** * Get the datamap for segmentid */ - override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = { - val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + override def getDataMaps(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -108,9 +107,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { * * @return */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { - val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + override def toDistributable(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -140,7 +138,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { * Return metadata of this datamap */ override def getMeta: DataMapMeta = { - new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava) + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) } } @@ -198,12 +197,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap { } private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) } - getEqualToExpression(f, buffer) } } } @@ -221,12 +224,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap { class CGDataMapWriter(identifier: AbsoluteTableIdentifier, segmentId: String, dataWritePath: String, - dataMapName: String) + dataMapSchema: DataMapSchema) extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { var currentBlockId: String = null val cgwritepath = dataWritePath + "/" + - dataMapName + System.nanoTime() + ".datamap" + dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" lazy val stream: DataOutputStream = FileFactory .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) val blockletList = new ArrayBuffer[Array[Byte]]() @@ -345,14 +348,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { """.stripMargin) val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") // register datamap writer - DataMapStoreManager.getInstance().createAndRegisterDataMap( - table.getAbsoluteTableIdentifier, - classOf[CGDataMapFactory].getName, "cgdatamap") + sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), sql("select * from normal_test where name='n502670'")) } + test("test cg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") + sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + override protected def afterAll(): Unit = { CompactionSupportGlobalSortBigFileTest.deleteFile(file2) sql("DROP TABLE IF EXISTS normal_test") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index f694a6b..903610a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Da import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.util.CarbonProperties @@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory { var identifier: AbsoluteTableIdentifier = _ override def init(identifier: AbsoluteTableIdentifier, - dataMapName: String): Unit = { + dataMapSchema: DataMapSchema): Unit = { this.identifier = identifier } @@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { } test("test write datamap 2 pages") { + sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") // register datamap writer - DataMapStoreManager.getInstance().createAndRegisterDataMap( - AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"), - classOf[C2DataMapFactory].getName, - "test") - + sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'") val df = buildTestData(33000) // save dataframe to carbon file @@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { } test("test write datamap 2 blocklet") { - // register datamap writer - DataMapStoreManager.getInstance().createAndRegisterDataMap( - AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"), - classOf[C2DataMapFactory].getName, - "test") + sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'") CarbonProperties.getInstance() .addProperty("carbon.blockletgroup.size.in.mb", "1") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index d1bb65f..8031dc2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.indexstore.FineGrainBlocklet import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression @@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo class FGDataMapFactory extends AbstractFineGrainDataMapFactory { var identifier: AbsoluteTableIdentifier = _ - var dataMapName: String = _ + var dataMapSchema: DataMapSchema = _ /** * Initialization of Datamap factory with the identifier and datamap name */ - override def init(identifier: AbsoluteTableIdentifier, - dataMapName: String): Unit = { + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { this.identifier = identifier - this.dataMapName = dataMapName + this.dataMapSchema = dataMapSchema } /** * Return a new write for this datamap */ override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) } /** @@ -137,7 +137,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { * Return metadata of this datamap */ override def getMeta: DataMapMeta = { - new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava) + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) } } @@ -228,12 +229,16 @@ class FGDataMap extends AbstractFineGrainDataMap { } def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) } - getEqualToExpression(f, buffer) } } } @@ -249,11 +254,12 @@ class FGDataMap extends AbstractFineGrainDataMap { } class FGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, dataWriterPath: String, dataMapName: String) + segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema) extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { var currentBlockId: String = null - val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap" + val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + + ".datamap" val stream: DataOutputStream = FileFactory .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() @@ -424,14 +430,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll { """.stripMargin) val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") // register datamap writer - DataMapStoreManager.getInstance().createAndRegisterDataMap( - table.getAbsoluteTableIdentifier, - classOf[FGDataMapFactory].getName, "fgdatamap") + sql( + s""" + | CREATE DATAMAP ggdatamap ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") checkAnswer(sql("select * from datamap_test where name='n502670'"), sql("select * from normal_test where name='n502670'")) } + test("test fg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + override protected def afterAll(): Unit = { CompactionSupportGlobalSortBigFileTest.deleteFile(file2) sql("DROP TABLE IF EXISTS normal_test") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index 37007ed..b2ab977 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -23,7 +23,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.common.exceptions.MetadataProcessException +import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties @@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { val newClass = "org.apache.spark.sql.CarbonSource" - test("test datamap create: don't support using class, only support short name") { - intercept[MalformedDataMapCommandException] { + test("test datamap create: don't support using non-exist class") { + intercept[MetadataProcessException] { sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 1) - assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1")) - assert(dataMapSchemaList.get(0).getClassName.equals(newClass)) } } - test("test datamap create with dmproperties: don't support using class") { - intercept[MalformedDataMapCommandException] { + test("test datamap create with dmproperties: don't support using non-exist class") { + intercept[MetadataProcessException] { sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2")) - assert(dataMapSchemaList.get(1).getClassName.equals(newClass)) - assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value")) } } - test("test datamap create with existing name: don't support using class") { - intercept[MalformedDataMapCommandException] { + test("test datamap create with existing name: don't support using non-exist class") { + intercept[MetadataProcessException] { sql( s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) } } @@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") - } - finally { + } finally { sql("drop table hiveMetaStoreTable") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, @@ -133,8 +116,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql("drop table hiveMetaStoreTable_1") checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") - } - finally { + } finally { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) @@ -142,17 +124,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } test("test datamap create with preagg with duplicate name") { - intercept[Exception] { - sql( - s""" - | CREATE DATAMAP datamap2 ON TABLE datamaptest - | USING 'preaggregate' - | DMPROPERTIES('key'='value') - | AS SELECT COUNT(a) FROM datamaptest + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | DMPROPERTIES('key'='value') + | AS SELECT COUNT(a) FROM datamaptest """.stripMargin) + intercept[MalformedDataMapCommandException] { sql( s""" - | CREATE DATAMAP datamap2 ON TABLE datamaptest + | CREATE DATAMAP datamap10 ON TABLE datamaptest | USING 'preaggregate' | DMPROPERTIES('key'='value') | AS SELECT COUNT(a) FROM datamaptest @@ -164,10 +146,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(dataMapSchemaList.size() == 2) } - test("test datamap drop with preagg") { - intercept[Exception] { - sql("drop table datamap3") - + test("test drop non-exist datamap") { + intercept[NoSuchDataMapException] { + sql("drop datamap nonexist on table datamaptest") } val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") assert(table != null) @@ -175,8 +156,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(dataMapSchemaList.size() == 2) } - test("test show datamap without preaggregate: don't support using class") { - intercept[MalformedDataMapCommandException] { + test("test show datamap without preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { sql("drop table if exists datamapshowtest") sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") @@ -185,8 +166,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } } - test("test show datamap with preaggregate: don't support using class") { - intercept[MalformedDataMapCommandException] { + test("test show datamap with preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { sql("drop table if exists datamapshowtest") sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") @@ -203,8 +184,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(sql("show datamap on table datamapshowtest").collect().length == 0) } - test("test show datamap after dropping datamap: don't support using class") { - intercept[MalformedDataMapCommandException] { + test("test show datamap after dropping datamap: don't support using non-exist class") { + intercept[MetadataProcessException] { sql("drop table if exists datamapshowtest") sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala index c200b1b..84b59e6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -46,10 +47,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit buildTestData() // register hook to the table to sleep, thus the other command will be executed - DataMapStoreManager.getInstance().createAndRegisterDataMap( - AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"), - classOf[WaitingDataMap].getName, - "test") + sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1") } private def buildTestData(): Unit = { @@ -166,7 +164,7 @@ object Global { class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { - override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { } + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { } override def fireEvent(event: Event): Unit = ??? http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala index 60052f0..8d2f9ee 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala @@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat} class SparkDataMapJob extends DataMapJob { override def execute(dataMapFormat: DistributableDataMapFormat, - resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = { - new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList + filter: FilterResolverIntf): util.List[ExtendedBlocklet] = { + new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList .asJava } } @@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte * RDD to prune the datamaps across spark cluster * @param sc * @param dataMapFormat - * @param resolverIntf */ class DataMapPruneRDD(sc: SparkContext, dataMapFormat: DistributableDataMapFormat, @@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext, val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit - DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf) val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext) reader.initialize(inputSplit, attemptContext) val iter = new Iterator[ExtendedBlocklet] { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 5dcca6d..ec26c34 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -95,8 +95,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider CarbonException.analysisException(s"table path already exists.") case (SaveMode.Overwrite, true) => val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession) - sqlContext.sparkSession - .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}") + // In order to overwrite, delete all segments in the table + sqlContext.sparkSession.sql( + s""" + | DELETE FROM TABLE $dbName.${options.tableName} + | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00' + """.stripMargin) (true, false) case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) => (true, false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala index d536746..1de66c1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala @@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf} import org.apache.carbondata.spark.util.CarbonScalaUtil -class SparkUnknownExpression(var sparkExp: SparkExpression) +class SparkUnknownExpression( + var sparkExp: SparkExpression, + expressionType: ExpressionType = ExpressionType.UNKNOWN) extends UnknownExpression with ConditionalExpression { private var evaluateExpression: (InternalRow) => Any = sparkExp.eval @@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression) } override def getFilterExpressionType: ExpressionType = { - ExpressionType.UNKNOWN + expressionType } override def getString: String = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index e93ab25..3d3f83b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -16,15 +16,21 @@ */ package org.apache.spark.sql.execution.command.datamap +import scala.collection.JavaConverters._ + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} /** * Below command class will be used to create datamap on table @@ -48,7 +54,7 @@ case class CarbonCreateDataMapCommand( if (carbonTable.isStreamingTable) { throw new MalformedCarbonCommandException("Streaming table does not support creating datamap") } - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + validateDataMapName(carbonTable) if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { @@ -72,20 +78,44 @@ case class CarbonCreateDataMapCommand( queryString.get ) } - createPreAggregateTableCommands.processMetadata(sparkSession) + try { + createPreAggregateTableCommands.processMetadata(sparkSession) + } catch { + case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " + + s"'$dataMapName'", e) + } } else { - throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) + val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) + dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) + val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( + Some(dbName), + tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + DataMapStoreManager.getInstance().createAndRegisterDataMap( + carbonTable.getAbsoluteTableIdentifier, dataMapSchema) + // Save DataMapSchema in the schema file of main table + PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession) } + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}") Seq.empty } + private def validateDataMapName(carbonTable: CarbonTable) = { + val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList + existingDataMaps.asScala.foreach { dataMapSchema => + if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) { + throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist") + } + } + } + override def processData(sparkSession: SparkSession): Seq[Row] = { if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { createPreAggregateTableCommands.processData(sparkSession) } else { - throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) + Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index e5db286..f410f52 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -72,7 +73,7 @@ case class CarbonDropDataMapCommand( Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)) } catch { case ex: NoSuchTableException => - throw ex + throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex) } if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index bf72325..6d4822b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -64,8 +65,10 @@ case class CreatePreAggregateTableCommand( dmProperties.foreach(t => tableProperties.put(t._1, t._2)) parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) - assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), - "Parent table name is different in select and create") + if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) { + throw new MalformedDataMapCommandException( + "Parent table name is different in select and create") + } var neworder = Seq[String]() val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala parentOrder.foreach(parentcol => @@ -131,8 +134,7 @@ case class CreatePreAggregateTableCommand( // updating the parent table about child table PreAggregateUtil.updateMainTable( - CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession), - parentTableIdentifier.table, + parentTable, childSchema, sparkSession) // After updating the parent carbon table with data map entry extract the latest table object @@ -147,8 +149,7 @@ case class CreatePreAggregateTableCommand( PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, parentTable.getTableName, parentTable.getDatabaseName) - } - else { + } else { queryString } val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 7e3b80e..1073f63 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.DataType +import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -404,22 +405,20 @@ object PreAggregateUtil { * Below method will be used to update the main table about the pre aggregate table information * in case of any exception it will throw error so pre aggregate table creation will fail * - * @param dbName - * @param tableName * @param childSchema * @param sparkSession */ - def updateMainTable(dbName: String, tableName: String, + def updateMainTable(carbonTable: CarbonTable, childSchema: DataMapSchema, sparkSession: SparkSession): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) var locks = List.empty[ICarbonLock] - var carbonTable: CarbonTable = null var numberOfCurrentChild: Int = 0 + val dbName = carbonTable.getDatabaseName + val tableName = carbonTable.getTableName try { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) // get the latest carbon table and check for column existence // read the latest schema file @@ -433,7 +432,7 @@ object PreAggregateUtil { numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size if (wrapperTableInfo.getDataMapSchemaList.asScala. exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) { - throw new Exception("Duplicate datamap") + throw new MetadataProcessException("DataMap name already exist") } wrapperTableInfo.getDataMapSchemaList.add(childSchema) val thriftTable = schemaConverter http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index f38304e..79ff15e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY -import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 46e24dd..4b2bfa9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -610,7 +610,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CastExpressionOptimization.checkIfCastCanBeRemove(c) case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => CastExpressionOptimization.checkIfCastCanBeRemove(c) - case StartsWith(a: Attribute, Literal(v, t)) => + case s@StartsWith(a: Attribute, Literal(v, t)) => Some(sources.StringStartsWith(a.name, v.toString)) case c@EndsWith(a: Attribute, Literal(v, t)) => Some(CarbonEndsWith(c)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 4d91375..c062cfb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -36,8 +36,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -47,6 +49,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil */ object CarbonFilters { + val carbonProperties = CarbonProperties.getInstance() /** * Converts data sources filters to carbon filter predicates. */ @@ -114,25 +117,20 @@ object CarbonFilters { new OrExpression(lhsFilter, rhsFilter) } case sources.StringStartsWith(name, value) if value.length > 0 => - val l = new GreaterThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value)) - val maxValueLimit = value.substring(0, value.length - 1) + - (value.charAt(value.length - 1).toInt + 1).toChar - val r = new LessThanExpression( - getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit)) - Some(new AndExpression(l, r)) + Some(new StartsWithExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) case CarbonEndsWith(expr: Expression) => Some(new SparkUnknownExpression(expr.transform { case AttributeReference(name, dataType, _, _) => CarbonBoundReference(new CarbonColumnExpression(name.toString, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable) - })) + }, ExpressionType.ENDSWITH)) case CarbonContainsWith(expr: Expression) => Some(new SparkUnknownExpression(expr.transform { case AttributeReference(name, dataType, _, _) => CarbonBoundReference(new CarbonColumnExpression(name.toString, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable) - })) + }, ExpressionType.CONTAINSWITH)) case CastExpr(expr: Expression) => Some(transformExpression(expr)) case _ => None @@ -261,7 +259,7 @@ object CarbonFilters { CastExpressionOptimization.checkIfCastCanBeRemove(c) case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => CastExpressionOptimization.checkIfCastCanBeRemove(c) - case StartsWith(a: Attribute, Literal(v, t)) => + case s@StartsWith(a: Attribute, Literal(v, t)) => Some(sources.StringStartsWith(a.name, v.toString)) case c@EndsWith(a: Attribute, Literal(v, t)) => Some(CarbonEndsWith(c)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 31a6701..e817590 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.processing.store.TablePage; /** @@ -49,9 +49,9 @@ public class DataMapWriterListener { /** * register all datamap writer for specified table and segment */ - public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId, + public void registerAllWriter(CarbonTable carbonTable, String segmentId, String dataWritePath) { - List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier); + List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable); if (tableDataMaps != null) { for (TableDataMap tableDataMap : tableDataMaps) { DataMapFactory factory = tableDataMap.getDataMapFactory(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index b8e9062..4410e2a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -233,7 +233,7 @@ public final class DataLoadProcessBuilder { if (carbonTable.isHivePartitionTable()) { configuration.setWritingCoresCount((short) 1); } - TableSpec tableSpec = new TableSpec(dimensions, measures); + TableSpec tableSpec = new TableSpec(carbonTable); configuration.setTableSpec(tableSpec); return configuration; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 0e9cbc5..dd59ed3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration); DataMapWriterListener listener = new DataMapWriterListener(); - listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(), - storeLocation[new Random().nextInt(storeLocation.length)]); + listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(), + configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]); carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); @@ -321,13 +321,11 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); - carbonFactDataHandlerModel.tableSpec = new TableSpec( - segmentProperties.getDimensions(), - segmentProperties.getMeasures()); - + carbonFactDataHandlerModel.tableSpec = + new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable()); DataMapWriterListener listener = new DataMapWriterListener(); listener.registerAllWriter( - loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(), + loadModel.getCarbonDataLoadSchema().getCarbonTable(), loadModel.getSegmentId(), tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]); carbonFactDataHandlerModel.dataMapWriterlistener = listener;