[CARBONDATA-2094] Filter DataMap Tables in Show Table Command Currently Show Table command shows datamap tables (agg tablels) but show table command should not show aggregate tables.Solution :- Handle show table command in carbon side and Filter the datamap table and return rest of the tables.
This closes #1089 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee1c4d42 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee1c4d42 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee1c4d42 Branch: refs/heads/branch-1.3 Commit: ee1c4d42fc0837e515ac222c676bd46fe93795d5 Parents: 19fdd4d Author: BJangir <babulaljangir...@gmail.com> Authored: Mon Jan 29 23:46:56 2018 +0530 Committer: kumarvishal <kumarvishal.1...@gmail.com> Committed: Thu Feb 1 18:42:05 2018 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggCreateCommand.scala | 36 +++++++++ .../preaggregate/TestPreAggregateDrop.scala | 9 ++- .../command/table/CarbonShowTablesCommand.scala | 82 ++++++++++++++++++++ .../spark/sql/hive/CarbonSessionState.scala | 11 ++- .../spark/sql/hive/CarbonSessionState.scala | 11 ++- 5 files changed, 142 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 23132de..f1d7396 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -233,6 +233,20 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { } val timeSeries = TIMESERIES.toString + test("remove agg tables from show table command") { + sql("DROP TABLE IF EXISTS tbl_1") + sql("DROP TABLE IF EXISTS sparktable") + sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") + sql("create table if not exists sparktable(a int,b string)") + sql( + s"""create datamap preagg_sum on table tbl_1 using 'preaggregate' as select mac,avg(age) from tbl_1 group by mac""" + .stripMargin) + sql( + "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" + + ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," + + "mac from tbl_1 group by prodate,mac") + checkExistence(sql("show tables"), false, "tbl_1_preagg_sum","tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year") + } test("test pre agg create table 21: create with preaggregate and hierarchy") { sql("DROP TABLE IF EXISTS maintabletime") @@ -287,6 +301,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") } + test("remove agg tables from show table command") { + sql("DROP TABLE IF EXISTS tbl_1") + sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") + sql("create datamap agg1 on table tbl_1 using 'preaggregate' as select mac, sum(age) from tbl_1 group by mac") + sql("create table if not exists sparktable(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) ") + checkExistence(sql("show tables"), false, "tbl_1_agg1") + checkExistence(sql("show tables"), true, "sparktable","tbl_1") + } + + + test("remove TimeSeries agg tables from show table command") { + sql("DROP TABLE IF EXISTS tbl_1") + sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") + sql( + "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" + + ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," + + "mac from tbl_1 group by prodate,mac") + checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year") + } + + + def getCarbontable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null plan.transform { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala index 1138adf..911a725 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala @@ -46,8 +46,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { " a,sum(c) from maintable group by a") sql("drop datamap if exists preagg2 on table maintable") val showTables = sql("show tables") + val showdatamaps =sql("show datamap on table maintable") checkExistence(showTables, false, "maintable_preagg2") - checkExistence(showTables, true, "maintable_preagg1") + checkExistence(showdatamaps, true, "maintable_preagg1") } test("drop datamap which is not existed") { @@ -66,8 +67,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { sql("drop datamap preagg_same on table maintable") var showTables = sql("show tables") + val showdatamaps =sql("show datamap on table maintable1") checkExistence(showTables, false, "maintable_preagg_same") - checkExistence(showTables, true, "maintable1_preagg_same") + checkExistence(showdatamaps, true, "maintable1_preagg_same") sql("drop datamap preagg_same on table maintable1") showTables = sql("show tables") checkExistence(showTables, false, "maintable1_preagg_same") @@ -84,7 +86,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" + " a,sum(c) from maintable group by a") showTables = sql("show tables") - checkExistence(showTables, true, "maintable_preagg_same1") + val showdatamaps =sql("show datamap on table maintable") + checkExistence(showdatamaps, true, "maintable_preagg_same1") sql("drop datamap preagg_same1 on table maintable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala new file mode 100644 index 0000000..c2a91d8 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.types.{BooleanType, StringType} + + +private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String], + tableIdentifierPattern: Option[String]) extends MetadataCommand{ + + // The result of SHOW TABLES has three columns: database, tableName and isTemporary. + override val output: Seq[Attribute] = { + AttributeReference("database", StringType, nullable = false)() :: + AttributeReference("tableName", StringType, nullable = false)() :: + AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil + } + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + // Since we need to return a Seq of rows, we will call getTables directly + // instead of calling tables in sparkSession. + // filterDataMaps Method is to Filter the Table. + val catalog = sparkSession.sessionState.catalog + val db = databaseName.getOrElse(catalog.getCurrentDatabase) + var tables = + tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) + tables = filterDataMaps(tables, sparkSession) + tables.map { tableIdent => + val isTemp = catalog.isTemporaryTable(tableIdent) + Row(tableIdent.database.getOrElse("default"), tableIdent.table, isTemp) + } + } + + /** + * + * @param tables tableIdnetifers + * @param sparkSession sparksession + * @return Tables after filter datamap tables + */ + private def filterDataMaps(tables: Seq[TableIdentifier], + sparkSession: SparkSession): Seq[TableIdentifier] = { + // Filter carbon Tables then get CarbonTable and getDataMap List and filter the same + // as of now 2 times lookup is happening(filter carbon table ,getDataMapList) + // TODO : add another PR (CARBONDATA-2103) to improve with 1 lookup + val allDatamapTable = tables.filter { table => + CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(table)(sparkSession) + }.map { table => + val ctable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession) + ctable.getTableInfo.getDataMapSchemaList.asScala + } + val alldamrelation = allDatamapTable + .flatMap { table => + table.map(eachtable => eachtable.getRelationIdentifier.toString) + } + tables + .filter { table => + !alldamrelation + .contains(table.database.getOrElse("default") + "." + table.identifier) + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 0fe0f96..0b62e10 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -22,11 +22,12 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionRes import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.parser.ParserUtils._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext +import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} +import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} @@ -336,4 +337,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes super.visitCreateTable(ctx) } } + + override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) { + CarbonShowTablesCommand( + Option(ctx.db).map(_.getText), + Option(ctx.pattern).map(string)) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index 3c151f0..baadd04 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -27,12 +27,13 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.parser.ParserUtils.string -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext} +import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} +import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} @@ -395,4 +396,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { super.visitCreateTable(ctx) } + + override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) { + CarbonShowTablesCommand( + Option(ctx.db).map(_.getText), + Option(ctx.pattern).map(string)) + } }