This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3630b12  [CARBONDATA-3566] Support add segment for partition table
3630b12 is described below

commit 3630b12665bd06b1e6f8ae91a7f23bec7bad47d9
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Mon Nov 4 15:28:30 2019 +0800

    [CARBONDATA-3566] Support add segment for partition table
    
    CarbonData supports ADD SEGMENT for non-partition table already,
    it should also support for Hive partition table.
    
    This closes #3431
---
 .../carbondata/core/metadata/SegmentFileStore.java |  44 +++--
 .../core/writer/CarbonIndexFileMergeWriter.java    |   2 +-
 .../hadoop/api/CarbonOutputCommitter.java          |   2 +-
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 154 ++++++++++++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala           |   2 +-
 .../command/management/CarbonAddLoadCommand.scala  | 184 +++++++++++++++++----
 .../execution/strategy/MixedFormatHandler.scala    | 132 +++++++++++----
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   6 +-
 8 files changed, 446 insertions(+), 80 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 57eb46d..e7feb3f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -242,24 +243,36 @@ public class SegmentFileStore {
     return false;
   }
 
-  public static boolean writeSegmentFileForOthers(CarbonTable carbonTable, 
Segment segment)
+  public static boolean writeSegmentFileForOthers(
+      CarbonTable carbonTable,
+      Segment segment,
+      PartitionSpec partitionSpec,
+      List<FileStatus> partitionDataFiles)
       throws IOException {
     String tablePath = carbonTable.getTablePath();
-    CarbonFile segmentFolder = 
FileFactory.getCarbonFile(segment.getSegmentPath());
-    CarbonFile[] otherFiles = segmentFolder.listFiles(new CarbonFileFilter() {
-      @Override
-      public boolean accept(CarbonFile file) {
-        return (!file.getName().equals("_SUCCESS") && 
!file.getName().endsWith(".crc"));
-      }
-    });
-    if (otherFiles != null && otherFiles.length > 0) {
+    CarbonFile[] dataFiles = null;
+    if (partitionDataFiles.isEmpty()) {
+      CarbonFile segmentFolder = 
FileFactory.getCarbonFile(segment.getSegmentPath());
+      dataFiles = segmentFolder.listFiles(
+          file -> (!file.getName().equals("_SUCCESS") && 
!file.getName().endsWith(".crc")));
+    } else {
+      dataFiles = partitionDataFiles.stream().map(
+          fileStatus -> FileFactory.getCarbonFile(
+              fileStatus.getPath().toString())).toArray(CarbonFile[]::new);
+    }
+    if (dataFiles != null && dataFiles.length > 0) {
       SegmentFile segmentFile = new SegmentFile();
       segmentFile.setOptions(segment.getOptions());
       FolderDetails folderDetails = new FolderDetails();
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
       folderDetails.setRelative(false);
-      segmentFile.addPath(segment.getSegmentPath(), folderDetails);
-      for (CarbonFile file : otherFiles) {
+      if (!partitionDataFiles.isEmpty()) {
+        folderDetails.setPartitions(partitionSpec.getPartitions());
+        segmentFile.addPath(partitionSpec.getLocation().toString(), 
folderDetails);
+      } else {
+        segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+      }
+      for (CarbonFile file : dataFiles) {
         folderDetails.getFiles().add(file.getName());
       }
       String segmentFileFolder = 
CarbonTablePath.getSegmentFilesLocation(tablePath);
@@ -437,18 +450,19 @@ public class SegmentFileStore {
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean updateSegmentFile(CarbonTable carbonTable, String 
segmentId,
+  public static boolean updateTableStatusFile(CarbonTable carbonTable, String 
segmentId,
       String segmentFile, String tableId, SegmentFileStore segmentFileStore) 
throws IOException {
-    return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId, 
segmentFileStore, null);
+    return updateTableStatusFile(carbonTable, segmentId, segmentFile, tableId, 
segmentFileStore,
+        null);
   }
 
   /**
-   * This API will update the segmentFile of a passed segment.
+   * This API will update the table status file with specified segment.
    *
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean updateSegmentFile(CarbonTable carbonTable, String 
segmentId,
+  public static boolean updateTableStatusFile(CarbonTable carbonTable, String 
segmentId,
       String segmentFile, String tableId, SegmentFileStore segmentFileStore,
       SegmentStatus segmentStatus) throws IOException {
     boolean status = false;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index c9d4c26..4760bdc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -193,7 +193,7 @@ public class CarbonIndexFileMergeWriter {
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
       SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), 
path);
-      SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName,
+      SegmentFileStore.updateTableStatusFile(table, segmentId, 
newSegmentFileName,
           table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
     }
 
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 21861d9..549ca7c 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -144,7 +144,7 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       uuid = operationContext.getProperty("uuid").toString();
     }
 
-    SegmentFileStore.updateSegmentFile(carbonTable, loadModel.getSegmentId(),
+    SegmentFileStore.updateTableStatusFile(carbonTable, 
loadModel.getSegmentId(),
         segmentFileName + CarbonTablePath.SEGMENT_EXT,
         carbonTable.getCarbonTableIdentifier().getTableId(),
         new SegmentFileStore(carbonTable.getTablePath(),
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 129c0f0..c9b5bf6 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -19,10 +19,10 @@ package org.apache.carbondata.spark.testsuite.addsegment
 import java.io.File
 import java.nio.file.{Files, Paths}
 
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
 import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, 
Schema}
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
 import org.junit.Assert
 import scala.io.Source
 
@@ -595,6 +596,155 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists addSegParmore")
   }
 
+  test("test add segment partition table") {
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists orc_table")
+
+    sql("create table parquet_table(value int, name string, age int) using 
parquet partitioned by (name, age)")
+    sql("create table carbon_table(value int) partitioned by (name string, age 
int) stored as carbondata")
+    sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+    sql("select * from parquet_table").show
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("parquet_table")).location
+
+    // add data from parquet table to carbon table
+    sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet', 
'partition'='name:string,age:int')")
+    checkAnswer(sql("select * from carbon_table"), sql("select * from 
parquet_table"))
+
+    // load new data into carbon table
+    sql("insert into carbon_table select * from parquet_table")
+    checkAnswer(sql("select * from carbon_table"), sql("select * from 
parquet_table union all select * from parquet_table"))
+
+    // add another data from orc table to carbon table
+    sql("create table orc_table(value int, name string, age int) using orc 
partitioned by (name, age)")
+    sql("insert into orc_table values (30, 'orc', 50), (40, 'orc', 13)")
+    sql("insert into orc_table values (30, 'fast', 10), (10, 'fast', 13)")
+    val orcRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("orc_table")).location
+    sql(s"alter table carbon_table add segment options ('path'='$orcRootPath', 
'format'='orc', 'partition'='name:string,age:int')")
+    checkAnswer(sql("select * from carbon_table"),
+      sql("select * from parquet_table " +
+          "union all select * from parquet_table " +
+          "union all select * from orc_table"))
+
+    // filter query on partition column
+    checkAnswer(sql("select count(*) from carbon_table where name = 'amy'"), 
Row(4))
+
+    // do compaction
+    sql("alter table carbon_table compact 'major'")
+    checkAnswer(sql("select * from carbon_table"),
+      sql("select * from parquet_table " +
+          "union all select * from parquet_table " +
+          "union all select * from orc_table"))
+
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists orc_table")
+  }
+
+  test("show segment after add segment to partition table") {
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+
+    sql("create table parquet_table(value int, name string, age int) using 
parquet partitioned by (name, age)")
+    sql("create table carbon_table(value int) partitioned by (name string, age 
int) stored as carbondata")
+    sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+    sql("select * from parquet_table").show
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("parquet_table")).location
+
+    // add data from parquet table to carbon table
+    sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet', 
'partition'='name:string,age:int')")
+    checkAnswer(sql("select * from carbon_table"), sql("select * from 
parquet_table"))
+
+    // test show segment
+    checkExistence(sql(s"show segments for table carbon_table"), true, 
"spark-common/target/warehouse/parquet_table")
+    checkExistence(sql(s"show history segments for table carbon_table"), true, 
"spark-common/target/warehouse/parquet_table")
+
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+  }
+
+  test("test add segment partition table, missing partition option") {
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+
+    sql("create table parquet_table(value int, name string, age int) using 
parquet partitioned by (name, age)")
+    sql("create table carbon_table(value int) partitioned by (name string, age 
int) stored as carbondata")
+    sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+    sql("select * from parquet_table").show
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("parquet_table")).location
+
+    // add data from parquet table to carbon table
+    val exception = intercept[AnalysisException](
+      sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet')")
+    )
+    assert(exception.message.contains("partition option is required"))
+
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+  }
+
+  test("test add segment partition table, unmatched partition") {
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+
+    sql("create table parquet_table(value int, name string, age int) using 
parquet partitioned by (name)")
+    sql("create table carbon_table(value int) partitioned by (name string, age 
int) stored as carbondata")
+    sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+    sql("select * from parquet_table").show
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("parquet_table")).location
+
+    // add data from parquet table to carbon table
+    // unmatched partition
+    var exception = intercept[AnalysisException](
+      sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string')")
+    )
+    assert(exception.message.contains("Partition is not same"))
+
+    // incorrect partition option
+    exception = intercept[AnalysisException](
+      sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet', 
'partition'='name:string,age:int')")
+    )
+    assert(exception.message.contains("input segment path does not comply to 
partitions in carbon table"))
+
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+  }
+
+  test("test add segment partition table, incorrect partition") {
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+
+    sql("create table parquet_table(value int) using parquet")
+    sql("create table carbon_table(value int) partitioned by (name string, age 
int) stored as carbondata")
+    sql("insert into parquet_table values (30), (40)")
+    sql("select * from parquet_table").show
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("parquet_table")).location
+
+    // add data from parquet table to carbon table
+    // incorrect partition option
+    val exception = intercept[RuntimeException](
+      sql(s"alter table carbon_table add segment options 
('path'='$parquetRootPath', 'format'='parquet', 
'partition'='name:string,age:int')")
+    )
+    assert(exception.getMessage.contains("invalid partition path"))
+
+    sql("drop table if exists parquet_table")
+    sql("drop table if exists carbon_table")
+  }
+
   private def copyseg(tableName: String, pathName: String): String = {
     val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier(tableName))
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 031f539..488468a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -535,7 +535,7 @@ object CarbonDataRDDFactory {
         SegmentFileStore.writeSegmentFile(carbonTable, 
carbonLoadModel.getSegmentId,
           String.valueOf(carbonLoadModel.getFactTimeStamp))
 
-      SegmentFileStore.updateSegmentFile(
+      SegmentFileStore.updateTableStatusFile(
         carbonTable,
         carbonLoadModel.getSegmentId,
         segmentFileName,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 7b2c088..e9025ce 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -23,8 +23,10 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil.convertSparkToCarbonDataType
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
 import org.apache.spark.sql.execution.strategy.MixedFormatHandler
 import org.apache.spark.sql.hive.CarbonRelation
@@ -36,8 +38,10 @@ import 
org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.{PartitionSpec => 
CarbonPartitionSpec}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -47,7 +51,6 @@ import 
org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.sdk.file.{Field, Schema}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles
-import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
 /**
@@ -89,45 +92,135 @@ case class CarbonAddLoadCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"delete segment")
     }
-    val segmentPath = options.getOrElse(
-      "path", throw new UnsupportedOperationException("PATH is manadatory"))
 
-    val allSegments = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    val inputPath = options.getOrElse(
+      "path", throw new UnsupportedOperationException("PATH is mandatory"))
 
     // If a path is already added then we should block the adding of the same 
path again.
-    if (allSegments.exists(a =>
-      a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath)
-    )) {
+    val allSegments = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    if (allSegments.exists(a => a.getPath != null && 
a.getPath.equalsIgnoreCase(inputPath))) {
       throw new AnalysisException(s"path already exists in table status file, 
can not add same " +
-                                  s"segment path repeatedly: $segmentPath")
+                                  s"segment path repeatedly: $inputPath")
     }
 
     val format = options.getOrElse("format", "carbondata")
     val isCarbonFormat = format.equalsIgnoreCase("carbondata") || 
format.equalsIgnoreCase("carbon")
 
     // If in the given location no carbon index files are found then we should 
throw an exception
-    if (isCarbonFormat && 
SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) {
+    if (isCarbonFormat && 
SegmentFileStore.getListOfCarbonIndexFiles(inputPath).isEmpty) {
       throw new AnalysisException("CarbonIndex files not present in the 
location")
     }
 
-    val segSchema = MixedFormatHandler.getSchema(sparkSession, options, 
segmentPath)
-
-    val segCarbonSchema = new Schema(segSchema.fields.map { field =>
+    // infer schema and collect FileStatus for all partitions
+    val (inputPathSchema, lastLevelDirFileMap) =
+      MixedFormatHandler.collectInfo(sparkSession, options, inputPath)
+    val inputPathCarbonFields = inputPathSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
-    })
-
-    val tableCarbonSchema = new Schema(tableSchema.fields.map { field =>
+    }
+    val carbonTableSchema = new Schema(tableSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
     })
 
+    // update schema if has partition
+    val inputPathTableFields = if (carbonTable.isHivePartitionTable) {
+      val partitions = options.getOrElse("partition",
+        throw new AnalysisException(
+          "partition option is required when adding segment to partition 
table")
+      )
+      // extract partition given by user, partition option should be form of 
"a:int, b:string"
+      val partitionFields = partitions
+        .split(",")
+        .map { input =>
+          if (input.nonEmpty) {
+            val nameAndDataType = input.trim.toLowerCase.split(":")
+            if (nameAndDataType.size == 2) {
+              new Field(nameAndDataType(0), nameAndDataType(1))
+            } else {
+              throw new AnalysisException(s"invalid partition option: ${ 
options.toString() }")
+            }
+          }
+        }
+      // validate against the partition in carbon table
+      val carbonTablePartition = getCarbonTablePartition(sparkSession)
+      if (!partitionFields.sameElements(carbonTablePartition)) {
+        throw new AnalysisException(
+          s"""
+             |Partition is not same. Carbon table partition is :
+             |${carbonTablePartition.mkString(",")} and input segment 
partition is :
+             |${partitionFields.mkString(",")}
+             |""".stripMargin)
+      }
+      inputPathCarbonFields ++ partitionFields
+    } else {
+      if (options.contains("partition")) {
+        throw new AnalysisException(
+          s"Invalid option: partition, $tableName is not a partitioned table")
+      }
+      inputPathCarbonFields
+    }
 
-    if (!tableCarbonSchema.getFields.forall(f => 
segCarbonSchema.getFields.exists(_.equals(f)))) {
+    // validate the schema including partition columns
+    val schemaMatched = carbonTableSchema.getFields.forall { field =>
+      inputPathTableFields.exists(_.equals(field))
+    }
+    if (!schemaMatched) {
       throw new AnalysisException(s"Schema is not same. Table schema is : " +
-                                  s"${tableSchema} and segment schema is : 
${segSchema}")
+                                  s"${tableSchema} and segment schema is : 
${inputPathSchema}")
+    }
+
+    // all validation is done, update the metadata accordingly
+    if (carbonTable.isHivePartitionTable) {
+      // for each partition in input path, create a new segment in carbon table
+      val partitionSpecs = collectPartitionSpecList(
+        sparkSession, carbonTable.getTablePath, inputPath, 
lastLevelDirFileMap.keys.toSeq)
+      // check the collected partition from input segment path should comply to
+      // partitions in carbon table
+      val carbonTablePartition = getCarbonTablePartition(sparkSession)
+      if (partitionSpecs.head.getPartitions.size() != 
carbonTablePartition.length) {
+        throw new AnalysisException(
+          s"""
+             |input segment path does not comply to partitions in carbon table:
+             |${carbonTablePartition.mkString(",")}
+             |""".stripMargin)
+      }
+      partitionSpecs.foreach { partitionSpec =>
+        val dataFiles = 
lastLevelDirFileMap.getOrElse(partitionSpec.getLocation.toString,
+          throw new RuntimeException(s"partition folder not found: 
${partitionSpec.getLocation}"))
+        writeMetaForSegment(sparkSession, carbonTable, inputPath, 
Some(partitionSpec), dataFiles)
+      }
+    } else {
+      writeMetaForSegment(sparkSession, carbonTable, inputPath)
     }
 
+    Seq.empty
+  }
+
+  private def getCarbonTablePartition(sparkSession: SparkSession): 
Array[Field] = {
+    sparkSession.sessionState.catalog
+      .getTableMetadata(TableIdentifier(tableName, databaseNameOp))
+      .partitionSchema
+      .fields
+      .map(f => new Field(f.name, convertSparkToCarbonDataType(f.dataType)))
+  }
+
+  /**
+   * Write metadata for external segment, including table status file and 
segment file
+   *
+   * @param sparkSession spark session
+   * @param carbonTable carbon table
+   * @param segmentPath external segment path specified by user
+   * @param partitionSpecOp partition info extracted from the path
+   * @param partitionDataFiles all data files in the partition
+   */
+  private def writeMetaForSegment(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      segmentPath: String,
+      partitionSpecOp: Option[CarbonPartitionSpec] = None,
+      partitionDataFiles: Seq[FileStatus] = Seq.empty
+  ): Unit = {
     val model = new CarbonLoadModel
     model.setCarbonTransactionalTable(true)
     model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
@@ -147,8 +240,8 @@ case class CarbonAddLoadCommand(
       val dataMapNames: mutable.Buffer[String] =
         tableDataMaps.asScala.map(dataMap => 
dataMap.getDataMapSchema.getDataMapName)
       val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
-        new BuildDataMapPreExecutionEvent(sparkSession,
-          carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+        BuildDataMapPreExecutionEvent(
+          sparkSession, carbonTable.getAbsoluteTableIdentifier, dataMapNames)
       
OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
         dataMapOperationContext)
     }
@@ -160,12 +253,16 @@ case class CarbonAddLoadCommand(
       model.getFactTimeStamp,
       false)
     newLoadMetaEntry.setPath(segmentPath)
+    val format = options.getOrElse("format", "carbondata")
+    val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
+                         format.equalsIgnoreCase("carbon")
     if (!isCarbonFormat) {
       newLoadMetaEntry.setFileFormat(new FileFormat(format))
     }
 
     CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, 
false)
-    val segment = new Segment(model.getSegmentId,
+    val segment = new Segment(
+      model.getSegmentId,
       SegmentFileStore.genSegmentFileName(
         model.getSegmentId,
         System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
@@ -175,19 +272,24 @@ case class CarbonAddLoadCommand(
       if (isCarbonFormat) {
         SegmentFileStore.writeSegmentFile(carbonTable, segment)
       } else {
-        SegmentFileStore.writeSegmentFileForOthers(carbonTable, segment)
+        SegmentFileStore.writeSegmentFileForOthers(
+          carbonTable, segment, partitionSpecOp.orNull, 
partitionDataFiles.asJava)
       }
 
-    operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
-      model.getSegmentId)
-    val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-      new LoadTablePreStatusUpdateEvent(
-        carbonTable.getCarbonTableIdentifier,
-        model)
-    
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
+    // This event will trigger merge index job, only trigger it if it is 
carbon file
+    if (isCarbonFormat) {
+      operationContext.setProperty(
+        carbonTable.getTableUniqueName + "_Segment",
+        model.getSegmentId)
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          carbonTable.getCarbonTableIdentifier,
+          model)
+      
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
+    }
 
     val success = if (writeSegment) {
-       SegmentFileStore.updateSegmentFile(
+      SegmentFileStore.updateTableStatusFile(
         carbonTable,
         model.getSegmentId,
         segment.getSegmentFileName,
@@ -241,10 +343,30 @@ case class CarbonAddLoadCommand(
       OperationListenerBus.getInstance()
         .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
     }
-    Seq.empty
   }
 
-
+  // extract partition column and value, for example, given
+  // path1 = path/to/partition/a=1/b=earth
+  // path2 = path/to/partition/a=2/b=moon
+  // will extract a list of CarbonPartitionSpec:
+  //   CarbonPartitionSpec {("a=1","b=earth"), "path/to/partition"}
+  //   CarbonPartitionSpec {("a=2","b=moon"), "path/to/partition"}
+  def collectPartitionSpecList(
+      sparkSession: SparkSession,
+      tablePath: String,
+      inputPath: String,
+      partitionPaths: Seq[String]
+  ): Seq[CarbonPartitionSpec] = {
+    partitionPaths.map { path =>
+      try {
+        val partitionOnlyPath = path.substring(inputPath.length + 1)
+        val partitionColumnAndValue = 
partitionOnlyPath.split("/").toList.asJava
+        new CarbonPartitionSpec(partitionColumnAndValue, path)
+      } catch {
+        case t: Throwable => throw new RuntimeException(s"invalid partition 
path: $path")
+      }
+    }
+  }
 
   override protected def opName: String = "ADD SEGMENT WITH PATH"
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 26c0fb0..3191ba8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -19,14 +19,17 @@ package org.apache.spark.sql.execution.strategy
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{execution, MixedFormatHandlerUtil, SparkSession}
+import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
 import 
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, 
NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -56,22 +59,76 @@ object MixedFormatHandler {
     supportedFormats.exists(_.equalsIgnoreCase(format))
   }
 
-  def getSchema(sparkSession: SparkSession,
+  /**
+   * collect schema, list of last level directory and list of all data files 
under given path
+   *
+   * @param sparkSession spark session
+   * @param options option for ADD SEGMENT
+   * @param inputPath under which path to collect
+   * @return schema of the data file, map of last level directory (partition 
folder) to its
+   *         children file list (data files)
+   */
+  def collectInfo(
+      sparkSession: SparkSession,
       options: Map[String, String],
-      segPath: String): StructType = {
-    val format = options.getOrElse("format", "carbondata")
-    if ((format.equalsIgnoreCase("carbondata") || 
format.equalsIgnoreCase("carbon"))) {
-      new SparkCarbonFileFormat().inferSchema(sparkSession, options, 
Seq.empty).get
+      inputPath: String): (StructType, mutable.Map[String, Seq[FileStatus]]) = 
{
+    val path = new Path(inputPath)
+    val fs = 
path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+    val rootPath = fs.getFileStatus(path)
+    val leafDirFileMap = collectAllLeafFileStatus(sparkSession, rootPath, fs)
+    val format = options.getOrElse("format", "carbondata").toLowerCase
+    val fileFormat = if (format.equalsIgnoreCase("carbondata") ||
+                         format.equalsIgnoreCase("carbon")) {
+      new SparkCarbonFileFormat()
     } else {
-      val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", 
"/"))
-      val path = new Path(filePath)
-      val fs = 
path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
-      val status = fs.listStatus(path, new PathFilter {
-        override def accept(path: Path): Boolean = {
-          !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
-        }
-      })
-      getFileFormat(new FileFormatName(format)).inferSchema(sparkSession, 
options, status).get
+      getFileFormat(new FileFormatName(format))
+    }
+    if (leafDirFileMap.isEmpty) {
+      throw new RuntimeException("no partition data is found")
+    }
+    val schema = fileFormat.inferSchema(sparkSession, options, 
leafDirFileMap.head._2).get
+    (schema, leafDirFileMap)
+  }
+
+  /**
+   * collect leaf directories and leaf files recursively in given path
+   *
+   * @param sparkSession spark session
+   * @param path path to collect
+   * @param fs hadoop file system
+   * @return mapping of leaf directory to its children files
+   */
+  private def collectAllLeafFileStatus(
+      sparkSession: SparkSession,
+      path: FileStatus,
+      fs: FileSystem): mutable.Map[String, Seq[FileStatus]] = {
+    val directories: ArrayBuffer[FileStatus] = ArrayBuffer()
+    val leafFiles: ArrayBuffer[FileStatus] = ArrayBuffer()
+    val lastLevelFileMap = mutable.Map[String, Seq[FileStatus]]()
+
+    // get all files under input path
+    val fileStatus = fs.listStatus(path.getPath, new PathFilter {
+      override def accept(path: Path): Boolean = {
+        !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
+      }
+    })
+    // collect directories and files
+    fileStatus.foreach { file =>
+      if (file.isDirectory) directories.append(file)
+      else leafFiles.append(file)
+    }
+    if (leafFiles.nonEmpty) {
+      // leaf file is found, so parent folder (input parameter) is the last 
level dir
+      val updatedPath = FileFactory.getUpdatedFilePath(path.getPath.toString)
+      lastLevelFileMap.put(updatedPath, leafFiles)
+      lastLevelFileMap
+    } else {
+      // no leaf file is found, for each directory, collect recursively
+      directories.foreach { dir =>
+        val map = collectAllLeafFileStatus(sparkSession, dir, fs)
+        lastLevelFileMap ++= map
+      }
+      lastLevelFileMap
     }
   }
 
@@ -83,7 +140,8 @@ object MixedFormatHandler {
    * If multiple segments are with different formats like parquet , orc etc 
then it creates RDD for
    * each format segments and union them.
    */
-  def extraRDD(l: LogicalRelation,
+  def extraRDD(
+      l: LogicalRelation,
       projects: Seq[NamedExpression],
       filters: Seq[Expression],
       readCommittedScope: ReadCommittedScope,
@@ -99,13 +157,28 @@ object MixedFormatHandler {
       .filter(l => segsToAccess.isEmpty || 
segsToAccess.contains(l.getLoadName))
       .groupBy(_.getFileFormat)
       .map { case (format, detailses) =>
-        val paths = detailses.flatMap { d =>
-          
SegmentFileStore.readSegmentFile(CarbonTablePath.getSegmentFilePath(readCommittedScope
-            .getFilePath, d.getSegmentFile)).getLocationMap.asScala.flatMap { 
case (p, f) =>
-            f.getFiles.asScala.map { ef =>
-              new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef)
+        // collect paths as input to scan RDD
+        val paths = detailses. flatMap { d =>
+          val segmentFile = SegmentFileStore.readSegmentFile(
+            CarbonTablePath.getSegmentFilePath(readCommittedScope.getFilePath, 
d.getSegmentFile))
+
+          // If it is a partition table, the path to create RDD should be the 
root path of the
+          // partition folder (excluding the partition subfolder).
+          // If it is not a partition folder, collect all data file paths
+          if (segmentFile.getOptions.containsKey("partition")) {
+            val segmentPath = segmentFile.getOptions.get("path")
+            if (segmentPath == null) {
+              throw new RuntimeException("invalid segment file, 'path' option 
not found")
+            }
+            Seq(new Path(segmentPath))
+          } else {
+            // If it is not a partition folder, collect all data file paths to 
create RDD
+            segmentFile.getLocationMap.asScala.flatMap { case (p, f) =>
+              f.getFiles.asScala.map { ef =>
+                new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef)
+              }.toSeq
             }.toSeq
-          }.toSeq
+          }
         }
         val fileFormat = getFileFormat(format, supportBatch)
         getRDDForExternalSegments(l, projects, filters, fileFormat, paths)
@@ -125,7 +198,7 @@ object MixedFormatHandler {
               rdd = rdd.union(r._1)
             }
           }
-          Some(rdd, !rdds.exists(!_._2))
+          Some(rdd, rdds.forall(_._2))
         }
       }
     } else {
@@ -178,10 +251,13 @@ object MixedFormatHandler {
       case Some(catalogTable) =>
         val fileIndex =
           new InMemoryFileIndex(sparkSession, paths, 
catalogTable.storage.properties, None)
+        // exclude the partition in data schema
+        val dataSchema = catalogTable.schema.filterNot { column =>
+          catalogTable.partitionColumnNames.contains(column.name)}
         HadoopFsRelation(
           fileIndex,
           catalogTable.partitionSchema,
-          catalogTable.schema,
+          new StructType(dataSchema.toArray),
           catalogTable.bucketSpec,
           fileFormat,
           catalogTable.storage.properties)(sparkSession)
@@ -253,11 +329,11 @@ object MixedFormatHandler {
         dataFilters,
         l.catalogTable.map(_.identifier))
     val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
-    val withFilter = afterScanFilter.map(execution.FilterExec(_, 
scan)).getOrElse(scan)
+    val withFilter = afterScanFilter.map(FilterExec(_, scan)).getOrElse(scan)
     val withProjections = if (projects == withFilter.output) {
       withFilter
     } else {
-      execution.ProjectExec(projects, withFilter)
+      ProjectExec(projects, withFilter)
     }
     (withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession, 
outputSchema))
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 8670b13..82ea8f6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -483,7 +483,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   /**
-   * ALTER TABLE <db.tableName> ADD SEGMENT 
OPTIONS('path'='path','''key'='value')
+   * ALTER TABLE [dbName.]tableName ADD SEGMENT
+   * OPTIONS('path'='path','format'='format', ['partition'='schema list'])
+   *
+   * schema list format: column_name:data_type
+   * for example: 'partition'='a:int,b:string'
    */
   protected lazy val addLoad: Parser[LogicalPlan] =
     ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~

Reply via email to