http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala deleted file mode 100644 index b2ab977..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{File, FilenameFilter} - -import org.apache.spark.sql.Row -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.common.exceptions.MetadataProcessException -import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath - -class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { - - val testData = s"$resourcesPath/sample.csv" - - override def beforeAll { - sql("drop table if exists datamaptest") - sql("drop table if exists datamapshowtest") - sql("drop table if exists uniqdata") - sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") - } - - val newClass = "org.apache.spark.sql.CarbonSource" - - test("test datamap create: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") - } - } - - test("test datamap create with dmproperties: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - } - } - - test("test datamap create with existing name: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql( - s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - } - } - - test("test datamap create with preagg") { - sql("drop datamap if exists datamap3 on table datamaptest") - sql( - "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 1) - assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) - assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value")) - assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) - } - - test("check hivemetastore after drop datamap") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - "true") - sql("drop table if exists hiveMetaStoreTable") - sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") - - sql( - "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable") - checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") - - sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") - checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") - - } finally { - sql("drop table hiveMetaStoreTable") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("drop the table having pre-aggregate") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - "true") - sql("drop table if exists hiveMetaStoreTable_1") - sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") - - sql( - "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1") - - checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), - true, - "datamap_hiveMetaStoreTable_1") - - sql("drop table hiveMetaStoreTable_1") - - checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") - } finally { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("test datamap create with preagg with duplicate name") { - sql( - s""" - | CREATE DATAMAP datamap10 ON TABLE datamaptest - | USING 'preaggregate' - | DMPROPERTIES('key'='value') - | AS SELECT COUNT(a) FROM datamaptest - """.stripMargin) - intercept[MalformedDataMapCommandException] { - sql( - s""" - | CREATE DATAMAP datamap10 ON TABLE datamaptest - | USING 'preaggregate' - | DMPROPERTIES('key'='value') - | AS SELECT COUNT(a) FROM datamaptest - """.stripMargin) - } - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - } - - test("test drop non-exist datamap") { - intercept[NoSuchDataMapException] { - sql("drop datamap nonexist on table datamaptest") - } - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - } - - test("test show datamap without preaggregate: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) - } - } - - test("test show datamap with preaggregate: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 2) - checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") - } - } - - test("test show datamap with no datamap") { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - assert(sql("show datamap on table datamapshowtest").collect().length == 0) - } - - test("test show datamap after dropping datamap: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - sql("drop datamap datamap1 on table datamapshowtest") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 1) - checkExistence(frame, true, "datamap2", "(NA)", newClass) - } - } - - test("test if preaggregate load is successfull for hivemetastore") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("DROP TABLE IF EXISTS maintable") - sql( - """ - | CREATE TABLE maintable(id int, name string, city string, age int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - sql( - s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" - - .stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") - checkAnswer(sql(s"select * from maintable_preagg_sum"), - Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) - } finally { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("test preaggregate load for decimal column for hivemetastore") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") - sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") - sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") - checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) - sql("drop datamap if exists uniqdata_agg on table uniqdata") - } - - test("create pre-agg table with path") { - sql("drop table if exists main_preagg") - sql("drop table if exists main ") - val warehouse = s"$metastoredb/warehouse" - val path = warehouse + "/" + System.nanoTime + "_preAggTestPath" - sql( - s""" - | create table main( - | year int, - | month int, - | name string, - | salary int) - | stored by 'carbondata' - | tblproperties('sort_columns'='month,year,name') - """.stripMargin) - sql("insert into main select 10,11,'amy',12") - sql("insert into main select 10,11,'amy',14") - sql( - s""" - | create datamap preagg - | on table main - | using 'preaggregate' - | dmproperties ('path'='$path') - | as select name,avg(salary) - | from main - | group by name - """.stripMargin) - assertResult(true)(new File(path).exists()) - assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") - .list(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - name.contains(CarbonCommonConstants.FACT_FILE_EXT) - } - }).length > 0) - checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) - checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) - sql("drop datamap preagg on table main") - assertResult(false)(new File(path).exists()) - sql("drop table main") - } - - override def afterAll { - sql("DROP TABLE IF EXISTS maintable") - sql("drop table if exists uniqdata") - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - sql("drop table if exists datamaptest") - sql("drop table if exists datamapshowtest") - } -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala new file mode 100644 index 0000000..a6ffa9a --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{File, FilenameFilter} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.MetadataProcessException +import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeAll { + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + sql("drop table if exists uniqdata") + sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") + } + + val newClass = "org.apache.spark.sql.CarbonSource" + + test("test datamap create: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") + } + } + + test("test datamap create with dmproperties: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with existing name: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql( + s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with preagg") { + sql("drop datamap if exists datamap3 on table datamaptest") + sql( + "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 1) + assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) + assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) + } + + test("check hivemetastore after drop datamap") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable") + sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") + + sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") + + } finally { + sql("drop table hiveMetaStoreTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("drop the table having pre-aggregate") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable_1") + sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1") + + checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), + true, + "datamap_hiveMetaStoreTable_1") + + sql("drop table hiveMetaStoreTable_1") + + checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test datamap create with preagg with duplicate name") { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test drop non-exist datamap") { + intercept[NoSuchDataMapException] { + sql("drop datamap nonexist on table datamaptest") + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test show datamap without preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) + } + } + + test("test show datamap with preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 2) + checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") + } + } + + test("test show datamap with no datamap") { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + assert(sql("show datamap on table datamapshowtest").collect().length == 0) + } + + test("test show datamap after dropping datamap: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + sql("drop datamap datamap1 on table datamapshowtest") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 1) + checkExistence(frame, true, "datamap2", "(NA)", newClass) + } + } + + test("test if preaggregate load is successfull for hivemetastore") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + + .stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test preaggregate load for decimal column for hivemetastore") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") + sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") + sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") + checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) + sql("drop datamap if exists uniqdata_agg on table uniqdata") + } + + test("create pre-agg table with path") { + sql("drop table if exists main_preagg") + sql("drop table if exists main ") + val warehouse = s"$metastoredb/warehouse" + val path = warehouse + "/" + System.nanoTime + "_preAggTestPath" + sql( + s""" + | create table main( + | year int, + | month int, + | name string, + | salary int) + | stored by 'carbondata' + | tblproperties('sort_columns'='month,year,name') + """.stripMargin) + sql("insert into main select 10,11,'amy',12") + sql("insert into main select 10,11,'amy',14") + sql( + s""" + | create datamap preagg + | on table main + | using 'preaggregate' + | dmproperties ('path'='$path') + | as select name,avg(salary) + | from main + | group by name + """.stripMargin) + assertResult(true)(new File(path).exists()) + assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) + checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) + checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) + sql("drop datamap preagg on table main") + assertResult(false)(new File(path).exists()) + sql("drop table main") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS maintable") + sql("drop table if exists uniqdata") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala index 84b59e6..248441f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala @@ -28,8 +28,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} -import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMapFactory, AbstractCoarseGrainIndexDataMap} +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -47,7 +47,12 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit buildTestData() // register hook to the table to sleep, thus the other command will be executed - sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1") + sql( + s""" + | create datamap test on table orders + | using '${classOf[WaitingIndexDataMap].getName}' + | as select count(a) from hiveMetaStoreTable_1") + """.stripMargin) } private def buildTestData(): Unit = { @@ -101,7 +106,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit ) while (!Global.overwriteRunning && count < 1000) { Thread.sleep(10) - // to avoid dead loop in case WaitingDataMap is not invoked + // to avoid dead loop in case WaitingIndexDataMap is not invoked count += 1 } future @@ -162,7 +167,7 @@ object Global { var overwriteRunning = false } -class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { +class WaitingIndexDataMap() extends AbstractCoarseGrainIndexDataMapFactory { override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { } @@ -172,9 +177,9 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ??? - override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ??? override def createWriter(segmentId: String, writerPath: String): AbstractDataMapWriter = { new AbstractDataMapWriter(null, segmentId, writerPath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java new file mode 100644 index 0000000..2b3a306 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE; +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES; + +public class DataMapManager { + + private static DataMapManager INSTANCE; + + private DataMapManager() { } + + public static synchronized DataMapManager get() { + if (INSTANCE == null) { + INSTANCE = new DataMapManager(); + } + return INSTANCE; + } + + /** + * Return a DataMapProvider instance for specified dataMapSchema. + */ + public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) { + DataMapProvider provider; + if (dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) { + provider = new PreAggregateDataMapProvider(); + } else if (dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) { + provider = new TimeseriesDataMapProvider(); + } else { + provider = new IndexDataMapProvider(); + } + return provider; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java new file mode 100644 index 0000000..0cf0d04 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +/** + * Property that can be specified when creating DataMap + */ +@InterfaceAudience.Internal +public class DataMapProperty { + + /** + * Used to specify the store location of the datamap + */ + public static final String PATH = "path"; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java new file mode 100644 index 0000000..a71e0d8 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.processing.exception.DataLoadingException; + +import org.apache.spark.sql.SparkSession; + +/** + * DataMap is a accelerator for certain type of query. Developer can add new DataMap + * implementation to improve query performance. + * + * Currently two types of DataMap are supported + * <ol> + * <li> MVDataMap: materialized view type of DataMap to accelerate olap style query, + * like SPJG query (select, predicate, join, groupby) </li> + * <li> IndexDataMap: index type of DataMap to accelerate filter query </li> + * </ol> + * + * <p> + * In following command <br> + * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br> + * the <b>provider</b> string can be a short name or class name of the DataMap implementation. + * + * <br>Currently CarbonData supports following provider: + * <ol> + * <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li> + * <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension + * of the table </li> + * <li> class name of {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} + * implementation: Developer can implement new type of IndexDataMap by extending + * {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} </li> + * </ol> + * + * @since 1.4.0 + */ +@InterfaceAudience.Developer("DataMap") +@InterfaceStability.Unstable +public interface DataMapProvider { + + /** + * Initialize a datamap's metadata. + * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable" + * Implementation should initialize metadata for datamap, like creating table + */ + void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException; + + /** + * Initialize a datamap's data. + * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable" + * Implementation should initialize data for datamap, like creating data folders + */ + void initData(CarbonTable mainTable, SparkSession sparkSession); + + /** + * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String, SparkSession)}. + * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable" + * Implementation should clean all meta for the datamap + */ + void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession); + + /** + * Opposite operation of {@link #initData(CarbonTable, SparkSession)}. + * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable" + * Implementation should clean all data for the datamap + */ + void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession); + + /** + * Rebuild the datamap by loading all existing data from mainTable + * This is called when refreshing the datamap when + * 1. after datamap creation and if `autoRefreshDataMap` is set to true + * 2. user manually trigger refresh datamap command + */ + void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws DataLoadingException; + + /** + * Build the datamap incrementally by loading specified segment data + * This is called when user manually trigger refresh datamap + */ + void incrementalBuild(CarbonTable mainTable, String[] segmentIds, SparkSession sparkSession) + throws DataLoadingException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java new file mode 100644 index 0000000..e11e522 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.MetadataProcessException; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.datamap.DataMapRegistry; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.format.TableInfo; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil; + +@InterfaceAudience.Internal +public class IndexDataMapProvider implements DataMapProvider { + + private TableInfo originalTableInfo; + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema); + DataMapStoreManager.getInstance().registerDataMap( + mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory); + originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, dataMapSchema, sparkSession); + } + + @Override + public void initData(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, sparkSession); + } + + @Override + public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + DataMapStoreManager.getInstance().clearDataMap( + mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName()); + } + + @Override + public void rebuild(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void incrementalBuild(CarbonTable mainTable, String[] segmentIds, + SparkSession sparkSession) { + throw new UnsupportedOperationException(); + } + + private IndexDataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory; + try { + // try to create DataMapProvider instance by taking providerName as class name + Class<? extends IndexDataMapFactory> providerClass = + (Class<? extends IndexDataMapFactory>) Class.forName(dataMapSchema.getClassName()); + dataMapFactory = providerClass.newInstance(); + } catch (ClassNotFoundException e) { + // try to create DataMapProvider instance by taking providerName as short name + dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getClassName()); + } catch (Throwable e) { + throw new MetadataProcessException( + "failed to create DataMapProvider '" + dataMapSchema.getClassName() + "'", e); + } + return dataMapFactory; + } + + private IndexDataMapFactory getDataMapFactoryByShortName(String providerName) + throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory; + String className = DataMapRegistry.getDataMapClassName(providerName); + if (className != null) { + try { + Class<? extends IndexDataMapFactory> datamapClass = + (Class<? extends IndexDataMapFactory>) Class.forName(providerName); + dataMapFactory = datamapClass.newInstance(); + } catch (ClassNotFoundException ex) { + throw new MalformedDataMapCommandException( + "DataMap '" + providerName + "' not found", ex); + } catch (Throwable ex) { + throw new MetadataProcessException( + "failed to create DataMap '" + providerName + "'", ex); + } + } else { + throw new MalformedDataMapCommandException( + "DataMap '" + providerName + "' not found"); + } + return dataMapFactory; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java new file mode 100644 index 0000000..ac38347 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper; +import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand; +import scala.Some; + +@InterfaceAudience.Internal +public class PreAggregateDataMapProvider implements DataMapProvider { + protected PreAggregateTableHelper helper; + protected CarbonDropTableCommand dropTableCommand; + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException { + validateDmProperty(dataMapSchema); + helper = new PreAggregateTableHelper( + mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(), + dataMapSchema.getProperties(), ctasSqlStatement, null); + helper.initMeta(sparkSession); + } + + private void validateDmProperty(DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + if (!dataMapSchema.getProperties().isEmpty()) { + if (dataMapSchema.getProperties().size() > 1 || + !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH)) { + throw new MalformedDataMapCommandException( + "Only 'path' dmproperty is allowed for this datamap"); + } + } + } + + @Override + public void initData(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + dropTableCommand = new CarbonDropTableCommand( + true, + new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()), + dataMapSchema.getRelationIdentifier().getTableName(), + true); + dropTableCommand.processMetadata(sparkSession); + } + + @Override + public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + if (dropTableCommand != null) { + dropTableCommand.processData(sparkSession); + } + } + + @Override + public void rebuild(CarbonTable mainTable, SparkSession sparkSession) { + if (helper != null) { + helper.initData(sparkSession); + } + } + + @Override + public void incrementalBuild(CarbonTable mainTable, String[] segmentIds, + SparkSession sparkSession) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java new file mode 100644 index 0000000..510839d --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper; +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil; +import scala.Tuple2; + +@InterfaceAudience.Internal +public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider { + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) { + Map<String, String> dmProperties = dataMapSchema.getProperties(); + String dmProviderName = dataMapSchema.getClassName(); + TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName); + Tuple2<String, String> details = + TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmProviderName); + dmProperties.remove(details._1()); + helper = new PreAggregateTableHelper( + mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(), + dmProperties, ctasSqlStatement, details._1()); + helper.initMeta(sparkSession); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 870b1f3..5fb3d56 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -93,7 +93,7 @@ class CarbonEnv { properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) } LOGGER.info(s"carbon env initial: $storePath") - // trigger event for CarbonEnv init + // trigger event for CarbonEnv create val operationContext = new OperationContext val carbonEnvInitPreEvent: CarbonEnvInitPreEvent = CarbonEnvInitPreEvent(sparkSession, storePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 3d3f83b..3f22955 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -21,16 +21,11 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} -import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider} /** * Below command class will be used to create datamap on table @@ -44,64 +39,30 @@ case class CarbonCreateDataMapCommand( queryString: Option[String]) extends AtomicRunnableCommand { - var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _ + private var dataMapProvider: DataMapProvider = _ + private var mainTable: CarbonTable = _ + private var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { // since streaming segment does not support building index and pre-aggregate yet, // so streaming table does not support create datamap - val carbonTable = - CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) - if (carbonTable.isStreamingTable) { + mainTable = + CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) + if (mainTable.isStreamingTable) { throw new MalformedCarbonCommandException("Streaming table does not support creating datamap") } - validateDataMapName(carbonTable) + validateDataMapName(mainTable) + dataMapSchema = new DataMapSchema(dataMapName, dmClassName) + dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) + dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema) + dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession) - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName) - createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - val details = TimeSeriesUtil - .getTimeSeriesGranularityDetails(dmproperties, dmClassName) - val updatedDmProperties = dmproperties - details._1 - CreatePreAggregateTableCommand(dataMapName, - tableIdentifier, - dmClassName, - updatedDmProperties, - queryString.get, - Some(details._1)) - } else { - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - dmClassName, - dmproperties, - queryString.get - ) - } - try { - createPreAggregateTableCommands.processMetadata(sparkSession) - } catch { - case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " + - s"'$dataMapName'", e) - } - } else { - val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) - dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) - val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( - Some(dbName), - tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable - DataMapStoreManager.getInstance().createAndRegisterDataMap( - carbonTable.getAbsoluteTableIdentifier, dataMapSchema) - // Save DataMapSchema in the schema file of main table - PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession) - } val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}") Seq.empty } - private def validateDataMapName(carbonTable: CarbonTable) = { + private def validateDataMapName(carbonTable: CarbonTable): Unit = { val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList existingDataMaps.asScala.foreach { dataMapSchema => if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) { @@ -111,18 +72,19 @@ case class CarbonCreateDataMapCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - createPreAggregateTableCommands.processData(sparkSession) - } else { - Seq.empty + if (dataMapProvider != null) { + dataMapProvider.initData(mainTable, sparkSession) + if (mainTable.isAutoRefreshDataMap) { + dataMapProvider.rebuild(mainTable, sparkSession) + } } + Seq.empty } override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - createPreAggregateTableCommands.undoMetadata(sparkSession, exception) + if (dataMapProvider != null) { + dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession) + Seq.empty } else { throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index f410f52..4cfc6b4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil -import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider} import org.apache.carbondata.events._ /** @@ -51,7 +50,9 @@ case class CarbonDropDataMapCommand( tableName: String) extends AtomicRunnableCommand { - var commandToRun: CarbonDropTableCommand = _ + private var dataMapProvider: DataMapProvider = _ + private var mainTable: CarbonTable = _ + private var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -76,44 +77,42 @@ case class CarbonDropDataMapCommand( throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex) } if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { - val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. + mainTable = carbonTable.get + val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) - if (dataMapSchema.isDefined) { + if (dataMapSchemaOp.isDefined) { + dataMapSchema = dataMapSchemaOp.get._1 val operationContext = new OperationContext val dropDataMapPreEvent = DropDataMapPreEvent( - Some(dataMapSchema.get._1), + Some(dataMapSchema), ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext) - carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2) + mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2) val schemaConverter = new ThriftWrapperSchemaConverterImpl PreAggregateUtil.updateSchemaInfo( - carbonTable.get, + mainTable, schemaConverter.fromWrapperToExternalTableInfo( - carbonTable.get.getTableInfo, + mainTable.getTableInfo, dbName, tableName))(sparkSession) - commandToRun = CarbonDropTableCommand( - ifExistsSet = true, - Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), - dataMapSchema.get._1.getRelationIdentifier.getTableName, - dropChildTable = true - ) - commandToRun.processMetadata(sparkSession) + dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema) + dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession) + // fires the event after dropping datamap from main table schema val dropDataMapPostEvent = DropDataMapPostEvent( - Some(dataMapSchema.get._1), + Some(dataMapSchema), ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext) } else if (!ifExistsSet) { throw new NoSuchDataMapException(dataMapName, tableName) } - } else if ((carbonTable.isDefined && - carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) { + } else if (carbonTable.isDefined && + carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) { if (!ifExistsSet) { throw new NoSuchDataMapException(dataMapName, tableName) } @@ -140,10 +139,8 @@ case class CarbonDropDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder - if (commandToRun != null) { - DataMapStoreManager.getInstance().clearDataMap( - commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName) - commandToRun.processData(sparkSession) + if (dataMapProvider != null) { + dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index b53c609..ecf6f99 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -97,7 +97,7 @@ case class CarbonAlterTableDropPartitionCommand( partitionInfo.dropPartition(partitionIndex) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 84779cc..732178c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -89,7 +89,7 @@ case class CarbonAlterTableSplitPartitionCommand( updatePartitionInfo(partitionInfo, partitionIds) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala deleted file mode 100644 index 6d4822b..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.preaaggregate - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand -import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand -import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand -import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil -import org.apache.spark.sql.parser.CarbonSpark2SqlParser - -import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} - -/** - * Below command class will be used to create pre-aggregate table - * and updating the parent table about the child table information - * It will be either success or nothing happen in case of failure: - * 1. failed to create pre aggregate table. - * 2. failed to update main table - * - */ -case class CreatePreAggregateTableCommand( - dataMapName: String, - parentTableIdentifier: TableIdentifier, - dmClassName: String, - dmProperties: Map[String, String], - queryString: String, - timeSeriesFunction: Option[String] = None) - extends AtomicRunnableCommand { - - var parentTable: CarbonTable = _ - var loadCommand: CarbonLoadDataCommand = _ - - override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) - val df = sparkSession.sql(updatedQuery) - val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( - df.logicalPlan, queryString) - val fields = fieldRelationMap.keySet.toSeq - val tableProperties = mutable.Map[String, String]() - dmProperties.foreach(t => tableProperties.put(t._1, t._2)) - - parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) - if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) { - throw new MalformedDataMapCommandException( - "Parent table name is different in select and create") - } - var neworder = Seq[String]() - val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala - parentOrder.foreach(parentcol => - fields.filter(col => (fieldRelationMap.get(col).get.aggregateFunction.isEmpty) && - (parentcol.equals(fieldRelationMap.get(col).get. - columnTableRelationList.get(0).parentColumnName))) - .map(cols => neworder :+= cols.column) - ) - tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) - tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. - getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants - .LOAD_SORT_SCOPE_DEFAULT)) - tableProperties - .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) - val tableIdentifier = - TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, - parentTableIdentifier.database) - // prepare table model of the collected tokens - val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( - ifNotExistPresent = false, - new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), - tableIdentifier.table.toLowerCase, - fields, - Seq(), - tableProperties, - None, - isAlterFlow = false, - None) - - - // updating the relation identifier, this will be stored in child table - // which can be used during dropping of pre-aggreate table as parent table will - // also get updated - if(timeSeriesFunction.isDefined) { - TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable) - TimeSeriesUtil.validateEventTimeColumnExitsInSelect( - fieldRelationMap, - dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get) - TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap, - dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get, - timeSeriesFunction.get) - } - tableModel.parentTable = Some(parentTable) - tableModel.dataMapRelation = Some(fieldRelationMap) - val tablePath = if (dmProperties.contains("path")) { - dmProperties("path") - } else { - CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) - } - CarbonCreateTableCommand(TableNewProcessor(tableModel), - tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession) - - val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) - val tableInfo = table.getTableInfo - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable.buildChildSchema( - dataMapName, - CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA, - tableInfo.getDatabaseName, - queryString, - "AGGREGATION") - dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) - - // updating the parent table about child table - PreAggregateUtil.updateMainTable( - parentTable, - childSchema, - sparkSession) - // After updating the parent carbon table with data map entry extract the latest table object - // to be used in further create process. - parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database, - parentTableIdentifier.table)(sparkSession) - val updatedLoadQuery = if (timeSeriesFunction.isDefined) { - val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala - .filter(p => p.getDataMapName - .equalsIgnoreCase(dataMapName)).head - .asInstanceOf[AggregationDataMapSchema] - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, - parentTable.getTableName, - parentTable.getDatabaseName) - } else { - queryString - } - val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - updatedLoadQuery)).drop("preAggLoad") - val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala - .filter(dataMap => dataMap.getDataMapName.equalsIgnoreCase(dataMapName)).head - .asInstanceOf[AggregationDataMapSchema] - loadCommand = PreAggregateUtil.createLoadCommandForChild( - dataMap.getChildSchema.getListOfColumns, - tableIdentifier, - dataFrame, - false, - sparkSession = sparkSession) - loadCommand.processMetadata(sparkSession) - Seq.empty - } - - override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - // drop child table and undo the change in table info of main table - CarbonDropDataMapCommand( - dataMapName, - ifExistsSet = true, - parentTableIdentifier.database, - parentTableIdentifier.table).run(sparkSession) - Seq.empty - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - // load child table if parent table has existing segments - // This will be used to check if the parent table has any segments or not. If not then no - // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT - // table. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) - val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) - if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || - load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { - throw new UnsupportedOperationException( - "Cannot create pre-aggregate table when insert is in progress on main table") - } else if (loadAvailable.nonEmpty) { - val updatedQuery = if (timeSeriesFunction.isDefined) { - val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala - .filter(p => p.getDataMapName - .equalsIgnoreCase(dataMapName)).head - .asInstanceOf[AggregationDataMapSchema] - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, - parentTable.getTableName, - parentTable.getDatabaseName) - } else { - queryString - } - // Passing segmentToLoad as * because we want to load all the segments into the - // pre-aggregate table even if the user has set some segments on the parent table. - loadCommand.dataFrame = Some(PreAggregateUtil - .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) - PreAggregateUtil.startDataLoadForDataMap( - parentTable, - segmentToLoad = "*", - validateSegments = true, - sparkSession, - loadCommand) - } - Seq.empty - } -} - -