This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new c726af2 [CARBONDATA-3817]Fix table creation with all columns as partition columns c726af2 is described below commit c726af2efadc7108886c8bd0ef61bfaeb2a0fc44 Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Mon May 11 18:44:13 2020 +0530 [CARBONDATA-3817]Fix table creation with all columns as partition columns Why is this PR needed? When all the columns are given as partition columns during create table, create table should fail as a minimum one column should be present as a non-partition column. This is because after #3574 , we improved the create data source table and we call CreateDataSourceTableCommand of spark. Since we are creating as Hive table, if while creating hive compatible way, if it fails, then it will fall back to save its metadata in the Spark SQL specific way, so partition validation fails when we try to store in hive compatible way, so in retry, it will pass which is wrong behavior for hive compatible table. in Hive integration location do not have file system URI prepared for it What changes were proposed in this PR? For partition table, if all the columns are present as partition columns, then validate with the same API which spark does. append file system URI for location parameter while inferring schema. This closes #3762 --- .../org/apache/carbondata/hive/CarbonHiveSerDe.java | 4 +++- .../table/CarbonCreateDataSourceTableCommand.scala | 10 ++++++++++ .../StandardPartitionTableLoadingTestCase.scala | 17 +++++++++++++++-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java index 99b59b3..b955655 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.hadoop.conf.Configuration; @@ -115,7 +116,8 @@ public class CarbonHiveSerDe extends AbstractSerDe { private void inferSchema(Properties tbl, List<String> columnNames, List<TypeInfo> columnTypes) { if (columnNames.size() == 0 && columnTypes.size() == 0) { String external = tbl.getProperty("EXTERNAL"); - String location = tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION); + String location = CarbonUtil.checkAndAppendFileSystemURIScheme( + tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION)); if (external != null && "TRUE".equals(external) && location != null) { String[] names = tbl.getProperty(hive_metastoreConstants.META_TABLE_NAME).split("\\."); diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala index c94835f..4628978 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, Sp import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand} +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.carbondata.common.logging.LogServiceFactory @@ -66,7 +67,16 @@ case class CarbonCreateDataSourceTableCommand( val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore val (tableInfo, catalogTable) = CarbonSource.createTableMeta(sparkSession, table, metaStore) + // Since we are creating as Hive table, if while creating hive compatible way, if it fails,then + // it will fall back to save its metadata in the Spark SQL specific way, so partition validation + // fails when we try to store in hive compatible way, so in retry it might pass, so doing the + // partition validation here only. + // Refer: org.apache.spark.sql.hive.HiveExternalCatalog.scala#createDataSourceTable + val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis + PartitioningUtils.validatePartitionColumn(catalogTable.schema, + catalogTable.partitionColumnNames, + caseSensitiveAnalysis) val rows = try { CreateDataSourceTableCommand( catalogTable, diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 796dd50..e23d375 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -18,7 +18,7 @@ package org.apache.carbondata.spark.testsuite.standardpartition import java.io.{File, FileWriter, IOException} import java.util -import java.util.concurrent.{Callable, Executors, ExecutorService} +import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.Strings @@ -553,6 +553,19 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte Row("empno=99/empname=ravi/designation=xx"))) } + test("test create partition table with all the columns as partition columns") { + sql("drop table if exists partitionall_columns") + val ex = intercept[AnalysisException] { + sql( + """ + | CREATE TABLE partitionall_columns + | PARTITIONED BY (empno int,empname String, designation String) + | STORED AS carbondata + """.stripMargin) + } + assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for partition columns;")) + } + def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = { sql(s"drop table if exists $tableName") sql(