This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c726af2  [CARBONDATA-3817]Fix table creation with all columns as 
partition columns
c726af2 is described below

commit c726af2efadc7108886c8bd0ef61bfaeb2a0fc44
Author: akashrn5 <akashnilu...@gmail.com>
AuthorDate: Mon May 11 18:44:13 2020 +0530

    [CARBONDATA-3817]Fix table creation with all columns as partition columns
    
    Why is this PR needed?
    When all the columns are given as partition columns during create table,
    create table should fail as a minimum one column should be present as a
    non-partition column. This is because after #3574 , we improved the create
    data source table and we call CreateDataSourceTableCommand of spark.
    Since we are creating as Hive table, if while creating hive compatible way,
    if it fails, then it will fall back to save its metadata in the Spark SQL
    specific way, so partition validation fails when we try to store in hive
    compatible way, so in retry, it will pass which is wrong behavior for hive 
compatible table.
    in Hive integration location do not have file system URI prepared for it
    
    What changes were proposed in this PR?
    For partition table, if all the columns are present as partition columns,
    then validate with the same API which spark does. append file system URI
    for location parameter while inferring schema.
    
    This closes #3762
---
 .../org/apache/carbondata/hive/CarbonHiveSerDe.java     |  4 +++-
 .../table/CarbonCreateDataSourceTableCommand.scala      | 10 ++++++++++
 .../StandardPartitionTableLoadingTestCase.scala         | 17 +++++++++++++++--
 3 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
 
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 99b59b3..b955655 100644
--- 
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ 
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -32,6 +32,7 @@ import 
org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.conf.Configuration;
@@ -115,7 +116,8 @@ public class CarbonHiveSerDe extends AbstractSerDe {
   private void inferSchema(Properties tbl, List<String> columnNames, 
List<TypeInfo> columnTypes) {
     if (columnNames.size() == 0 && columnTypes.size() == 0) {
       String external = tbl.getProperty("EXTERNAL");
-      String location = 
tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+      String location = CarbonUtil.checkAndAppendFileSystemURIScheme(
+          tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
       if (external != null && "TRUE".equals(external) && location != null) {
         String[] names =
             
tbl.getProperty(hive_metastoreConstants.META_TABLE_NAME).split("\\.");
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
index c94835f..4628978 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonSource, Row, Sp
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, 
DropTableCommand, MetadataCommand}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 
@@ -66,7 +67,16 @@ case class CarbonCreateDataSourceTableCommand(
 
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val (tableInfo, catalogTable) = CarbonSource.createTableMeta(sparkSession, 
table, metaStore)
+    // Since we are creating as Hive table, if while creating hive compatible 
way, if it fails,then
+    // it will fall back to save its metadata in the Spark SQL specific way, 
so partition validation
+    // fails when we try to store in hive compatible way, so in retry it might 
pass, so doing the
+    // partition validation here only.
+    // Refer: 
org.apache.spark.sql.hive.HiveExternalCatalog.scala#createDataSourceTable
 
+    val caseSensitiveAnalysis = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.validatePartitionColumn(catalogTable.schema,
+      catalogTable.partitionColumnNames,
+      caseSensitiveAnalysis)
     val rows = try {
       CreateDataSourceTableCommand(
         catalogTable,
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 796dd50..e23d375 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -18,7 +18,7 @@ package 
org.apache.carbondata.spark.testsuite.standardpartition
 
 import java.io.{File, FileWriter, IOException}
 import java.util
-import java.util.concurrent.{Callable, Executors, ExecutorService}
+import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.Strings
@@ -553,6 +553,19 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
       Row("empno=99/empname=ravi/designation=xx")))
   }
 
+  test("test create partition table with all the columns as partition 
columns") {
+    sql("drop table if exists partitionall_columns")
+    val ex = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE TABLE partitionall_columns
+          | PARTITIONED BY (empno int,empname String, designation String)
+          | STORED AS carbondata
+      """.stripMargin)
+    }
+    assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for 
partition columns;"))
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): 
Unit = {
     sql(s"drop table if exists $tableName")
     sql(

Reply via email to