This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 3630b12 [CARBONDATA-3566] Support add segment for partition table 3630b12 is described below commit 3630b12665bd06b1e6f8ae91a7f23bec7bad47d9 Author: Jacky Li <jacky.li...@qq.com> AuthorDate: Mon Nov 4 15:28:30 2019 +0800 [CARBONDATA-3566] Support add segment for partition table CarbonData supports ADD SEGMENT for non-partition table already, it should also support for Hive partition table. This closes #3431 --- .../carbondata/core/metadata/SegmentFileStore.java | 44 +++-- .../core/writer/CarbonIndexFileMergeWriter.java | 2 +- .../hadoop/api/CarbonOutputCommitter.java | 2 +- .../testsuite/addsegment/AddSegmentTestCase.scala | 154 ++++++++++++++++- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../command/management/CarbonAddLoadCommand.scala | 184 +++++++++++++++++---- .../execution/strategy/MixedFormatHandler.scala | 132 +++++++++++---- .../spark/sql/parser/CarbonSpark2SqlParser.scala | 6 +- 8 files changed, 446 insertions(+), 80 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 57eb46d..e7feb3f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; @@ -242,24 +243,36 @@ public class SegmentFileStore { return false; } - public static boolean writeSegmentFileForOthers(CarbonTable carbonTable, Segment segment) + public static boolean writeSegmentFileForOthers( + CarbonTable carbonTable, + Segment segment, + PartitionSpec partitionSpec, + List<FileStatus> partitionDataFiles) throws IOException { String tablePath = carbonTable.getTablePath(); - CarbonFile segmentFolder = FileFactory.getCarbonFile(segment.getSegmentPath()); - CarbonFile[] otherFiles = segmentFolder.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - return (!file.getName().equals("_SUCCESS") && !file.getName().endsWith(".crc")); - } - }); - if (otherFiles != null && otherFiles.length > 0) { + CarbonFile[] dataFiles = null; + if (partitionDataFiles.isEmpty()) { + CarbonFile segmentFolder = FileFactory.getCarbonFile(segment.getSegmentPath()); + dataFiles = segmentFolder.listFiles( + file -> (!file.getName().equals("_SUCCESS") && !file.getName().endsWith(".crc"))); + } else { + dataFiles = partitionDataFiles.stream().map( + fileStatus -> FileFactory.getCarbonFile( + fileStatus.getPath().toString())).toArray(CarbonFile[]::new); + } + if (dataFiles != null && dataFiles.length > 0) { SegmentFile segmentFile = new SegmentFile(); segmentFile.setOptions(segment.getOptions()); FolderDetails folderDetails = new FolderDetails(); folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); folderDetails.setRelative(false); - segmentFile.addPath(segment.getSegmentPath(), folderDetails); - for (CarbonFile file : otherFiles) { + if (!partitionDataFiles.isEmpty()) { + folderDetails.setPartitions(partitionSpec.getPartitions()); + segmentFile.addPath(partitionSpec.getLocation().toString(), folderDetails); + } else { + segmentFile.addPath(segment.getSegmentPath(), folderDetails); + } + for (CarbonFile file : dataFiles) { folderDetails.getFiles().add(file.getName()); } String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath); @@ -437,18 +450,19 @@ public class SegmentFileStore { * @return boolean which determines whether status update is done or not. * @throws IOException */ - public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentId, + public static boolean updateTableStatusFile(CarbonTable carbonTable, String segmentId, String segmentFile, String tableId, SegmentFileStore segmentFileStore) throws IOException { - return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId, segmentFileStore, null); + return updateTableStatusFile(carbonTable, segmentId, segmentFile, tableId, segmentFileStore, + null); } /** - * This API will update the segmentFile of a passed segment. + * This API will update the table status file with specified segment. * * @return boolean which determines whether status update is done or not. * @throws IOException */ - public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentId, + public static boolean updateTableStatusFile(CarbonTable carbonTable, String segmentId, String segmentFile, String tableId, SegmentFileStore segmentFileStore, SegmentStatus segmentStatus) throws IOException { boolean status = false; diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index c9d4c26..4760bdc 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -193,7 +193,7 @@ public class CarbonIndexFileMergeWriter { + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; if (!table.isHivePartitionTable()) { SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName, + SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName, table.getCarbonTableIdentifier().getTableId(), segmentFileStore); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 21861d9..549ca7c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -144,7 +144,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { uuid = operationContext.getProperty("uuid").toString(); } - SegmentFileStore.updateSegmentFile(carbonTable, loadModel.getSegmentId(), + SegmentFileStore.updateTableStatusFile(carbonTable, loadModel.getSegmentId(), segmentFileName + CarbonTablePath.SEGMENT_EXT, carbonTable.getCarbonTableIdentifier().getTableId(), new SegmentFileStore(carbonTable.getTablePath(), diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala index 129c0f0..c9b5bf6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala @@ -19,10 +19,10 @@ package org.apache.carbondata.spark.testsuite.addsegment import java.io.File import java.nio.file.{Files, Paths} -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.spark.sql.{CarbonEnv, DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -32,6 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, Schema} +import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter} import org.junit.Assert import scala.io.Source @@ -595,6 +596,155 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists addSegParmore") } + test("test add segment partition table") { + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + sql("drop table if exists orc_table") + + sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)") + sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata") + sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)") + sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)") + sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)") + sql("select * from parquet_table").show + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("parquet_table")).location + + // add data from parquet table to carbon table + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')") + checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table")) + + // load new data into carbon table + sql("insert into carbon_table select * from parquet_table") + checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table union all select * from parquet_table")) + + // add another data from orc table to carbon table + sql("create table orc_table(value int, name string, age int) using orc partitioned by (name, age)") + sql("insert into orc_table values (30, 'orc', 50), (40, 'orc', 13)") + sql("insert into orc_table values (30, 'fast', 10), (10, 'fast', 13)") + val orcRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("orc_table")).location + sql(s"alter table carbon_table add segment options ('path'='$orcRootPath', 'format'='orc', 'partition'='name:string,age:int')") + checkAnswer(sql("select * from carbon_table"), + sql("select * from parquet_table " + + "union all select * from parquet_table " + + "union all select * from orc_table")) + + // filter query on partition column + checkAnswer(sql("select count(*) from carbon_table where name = 'amy'"), Row(4)) + + // do compaction + sql("alter table carbon_table compact 'major'") + checkAnswer(sql("select * from carbon_table"), + sql("select * from parquet_table " + + "union all select * from parquet_table " + + "union all select * from orc_table")) + + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + sql("drop table if exists orc_table") + } + + test("show segment after add segment to partition table") { + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + + sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)") + sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata") + sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)") + sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)") + sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)") + sql("select * from parquet_table").show + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("parquet_table")).location + + // add data from parquet table to carbon table + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')") + checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table")) + + // test show segment + checkExistence(sql(s"show segments for table carbon_table"), true, "spark-common/target/warehouse/parquet_table") + checkExistence(sql(s"show history segments for table carbon_table"), true, "spark-common/target/warehouse/parquet_table") + + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + } + + test("test add segment partition table, missing partition option") { + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + + sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)") + sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata") + sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)") + sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)") + sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)") + sql("select * from parquet_table").show + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("parquet_table")).location + + // add data from parquet table to carbon table + val exception = intercept[AnalysisException]( + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet')") + ) + assert(exception.message.contains("partition option is required")) + + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + } + + test("test add segment partition table, unmatched partition") { + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + + sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name)") + sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata") + sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)") + sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)") + sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)") + sql("select * from parquet_table").show + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("parquet_table")).location + + // add data from parquet table to carbon table + // unmatched partition + var exception = intercept[AnalysisException]( + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string')") + ) + assert(exception.message.contains("Partition is not same")) + + // incorrect partition option + exception = intercept[AnalysisException]( + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')") + ) + assert(exception.message.contains("input segment path does not comply to partitions in carbon table")) + + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + } + + test("test add segment partition table, incorrect partition") { + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + + sql("create table parquet_table(value int) using parquet") + sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata") + sql("insert into parquet_table values (30), (40)") + sql("select * from parquet_table").show + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("parquet_table")).location + + // add data from parquet table to carbon table + // incorrect partition option + val exception = intercept[RuntimeException]( + sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')") + ) + assert(exception.getMessage.contains("invalid partition path")) + + sql("drop table if exists parquet_table") + sql("drop table if exists carbon_table") + } + private def copyseg(tableName: String, pathName: String): String = { val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog .getTableMetadata(TableIdentifier(tableName)) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 031f539..488468a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -535,7 +535,7 @@ object CarbonDataRDDFactory { SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, String.valueOf(carbonLoadModel.getFactTimeStamp)) - SegmentFileStore.updateSegmentFile( + SegmentFileStore.updateTableStatusFile( carbonTable, carbonLoadModel.getSegmentId, segmentFileName, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala index 7b2c088..e9025ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala @@ -23,8 +23,10 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil.convertSparkToCarbonDataType +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{Checker, MetadataCommand} import org.apache.spark.sql.execution.strategy.MixedFormatHandler import org.apache.spark.sql.hive.CarbonRelation @@ -36,8 +38,10 @@ import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.indexstore.{PartitionSpec => CarbonPartitionSpec} import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -47,7 +51,6 @@ import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.sdk.file.{Field, Schema} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles -import org.apache.carbondata.spark.util.CarbonScalaUtil /** @@ -89,45 +92,135 @@ case class CarbonAddLoadCommand( if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") } - val segmentPath = options.getOrElse( - "path", throw new UnsupportedOperationException("PATH is manadatory")) - val allSegments = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val inputPath = options.getOrElse( + "path", throw new UnsupportedOperationException("PATH is mandatory")) // If a path is already added then we should block the adding of the same path again. - if (allSegments.exists(a => - a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath) - )) { + val allSegments = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + if (allSegments.exists(a => a.getPath != null && a.getPath.equalsIgnoreCase(inputPath))) { throw new AnalysisException(s"path already exists in table status file, can not add same " + - s"segment path repeatedly: $segmentPath") + s"segment path repeatedly: $inputPath") } val format = options.getOrElse("format", "carbondata") val isCarbonFormat = format.equalsIgnoreCase("carbondata") || format.equalsIgnoreCase("carbon") // If in the given location no carbon index files are found then we should throw an exception - if (isCarbonFormat && SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) { + if (isCarbonFormat && SegmentFileStore.getListOfCarbonIndexFiles(inputPath).isEmpty) { throw new AnalysisException("CarbonIndex files not present in the location") } - val segSchema = MixedFormatHandler.getSchema(sparkSession, options, segmentPath) - - val segCarbonSchema = new Schema(segSchema.fields.map { field => + // infer schema and collect FileStatus for all partitions + val (inputPathSchema, lastLevelDirFileMap) = + MixedFormatHandler.collectInfo(sparkSession, options, inputPath) + val inputPathCarbonFields = inputPathSchema.fields.map { field => val dataType = convertSparkToCarbonDataType(field.dataType) new Field(field.name, dataType) - }) - - val tableCarbonSchema = new Schema(tableSchema.fields.map { field => + } + val carbonTableSchema = new Schema(tableSchema.fields.map { field => val dataType = convertSparkToCarbonDataType(field.dataType) new Field(field.name, dataType) }) + // update schema if has partition + val inputPathTableFields = if (carbonTable.isHivePartitionTable) { + val partitions = options.getOrElse("partition", + throw new AnalysisException( + "partition option is required when adding segment to partition table") + ) + // extract partition given by user, partition option should be form of "a:int, b:string" + val partitionFields = partitions + .split(",") + .map { input => + if (input.nonEmpty) { + val nameAndDataType = input.trim.toLowerCase.split(":") + if (nameAndDataType.size == 2) { + new Field(nameAndDataType(0), nameAndDataType(1)) + } else { + throw new AnalysisException(s"invalid partition option: ${ options.toString() }") + } + } + } + // validate against the partition in carbon table + val carbonTablePartition = getCarbonTablePartition(sparkSession) + if (!partitionFields.sameElements(carbonTablePartition)) { + throw new AnalysisException( + s""" + |Partition is not same. Carbon table partition is : + |${carbonTablePartition.mkString(",")} and input segment partition is : + |${partitionFields.mkString(",")} + |""".stripMargin) + } + inputPathCarbonFields ++ partitionFields + } else { + if (options.contains("partition")) { + throw new AnalysisException( + s"Invalid option: partition, $tableName is not a partitioned table") + } + inputPathCarbonFields + } - if (!tableCarbonSchema.getFields.forall(f => segCarbonSchema.getFields.exists(_.equals(f)))) { + // validate the schema including partition columns + val schemaMatched = carbonTableSchema.getFields.forall { field => + inputPathTableFields.exists(_.equals(field)) + } + if (!schemaMatched) { throw new AnalysisException(s"Schema is not same. Table schema is : " + - s"${tableSchema} and segment schema is : ${segSchema}") + s"${tableSchema} and segment schema is : ${inputPathSchema}") + } + + // all validation is done, update the metadata accordingly + if (carbonTable.isHivePartitionTable) { + // for each partition in input path, create a new segment in carbon table + val partitionSpecs = collectPartitionSpecList( + sparkSession, carbonTable.getTablePath, inputPath, lastLevelDirFileMap.keys.toSeq) + // check the collected partition from input segment path should comply to + // partitions in carbon table + val carbonTablePartition = getCarbonTablePartition(sparkSession) + if (partitionSpecs.head.getPartitions.size() != carbonTablePartition.length) { + throw new AnalysisException( + s""" + |input segment path does not comply to partitions in carbon table: + |${carbonTablePartition.mkString(",")} + |""".stripMargin) + } + partitionSpecs.foreach { partitionSpec => + val dataFiles = lastLevelDirFileMap.getOrElse(partitionSpec.getLocation.toString, + throw new RuntimeException(s"partition folder not found: ${partitionSpec.getLocation}")) + writeMetaForSegment(sparkSession, carbonTable, inputPath, Some(partitionSpec), dataFiles) + } + } else { + writeMetaForSegment(sparkSession, carbonTable, inputPath) } + Seq.empty + } + + private def getCarbonTablePartition(sparkSession: SparkSession): Array[Field] = { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(tableName, databaseNameOp)) + .partitionSchema + .fields + .map(f => new Field(f.name, convertSparkToCarbonDataType(f.dataType))) + } + + /** + * Write metadata for external segment, including table status file and segment file + * + * @param sparkSession spark session + * @param carbonTable carbon table + * @param segmentPath external segment path specified by user + * @param partitionSpecOp partition info extracted from the path + * @param partitionDataFiles all data files in the partition + */ + private def writeMetaForSegment( + sparkSession: SparkSession, + carbonTable: CarbonTable, + segmentPath: String, + partitionSpecOp: Option[CarbonPartitionSpec] = None, + partitionDataFiles: Seq[FileStatus] = Seq.empty + ): Unit = { val model = new CarbonLoadModel model.setCarbonTransactionalTable(true) model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) @@ -147,8 +240,8 @@ case class CarbonAddLoadCommand( val dataMapNames: mutable.Buffer[String] = tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = - new BuildDataMapPreExecutionEvent(sparkSession, - carbonTable.getAbsoluteTableIdentifier, dataMapNames) + BuildDataMapPreExecutionEvent( + sparkSession, carbonTable.getAbsoluteTableIdentifier, dataMapNames) OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, dataMapOperationContext) } @@ -160,12 +253,16 @@ case class CarbonAddLoadCommand( model.getFactTimeStamp, false) newLoadMetaEntry.setPath(segmentPath) + val format = options.getOrElse("format", "carbondata") + val isCarbonFormat = format.equalsIgnoreCase("carbondata") || + format.equalsIgnoreCase("carbon") if (!isCarbonFormat) { newLoadMetaEntry.setFileFormat(new FileFormat(format)) } CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false) - val segment = new Segment(model.getSegmentId, + val segment = new Segment( + model.getSegmentId, SegmentFileStore.genSegmentFileName( model.getSegmentId, System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT, @@ -175,19 +272,24 @@ case class CarbonAddLoadCommand( if (isCarbonFormat) { SegmentFileStore.writeSegmentFile(carbonTable, segment) } else { - SegmentFileStore.writeSegmentFileForOthers(carbonTable, segment) + SegmentFileStore.writeSegmentFileForOthers( + carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava) } - operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", - model.getSegmentId) - val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = - new LoadTablePreStatusUpdateEvent( - carbonTable.getCarbonTableIdentifier, - model) - OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + // This event will trigger merge index job, only trigger it if it is carbon file + if (isCarbonFormat) { + operationContext.setProperty( + carbonTable.getTableUniqueName + "_Segment", + model.getSegmentId) + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + new LoadTablePreStatusUpdateEvent( + carbonTable.getCarbonTableIdentifier, + model) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + } val success = if (writeSegment) { - SegmentFileStore.updateSegmentFile( + SegmentFileStore.updateTableStatusFile( carbonTable, model.getSegmentId, segment.getSegmentFileName, @@ -241,10 +343,30 @@ case class CarbonAddLoadCommand( OperationListenerBus.getInstance() .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) } - Seq.empty } - + // extract partition column and value, for example, given + // path1 = path/to/partition/a=1/b=earth + // path2 = path/to/partition/a=2/b=moon + // will extract a list of CarbonPartitionSpec: + // CarbonPartitionSpec {("a=1","b=earth"), "path/to/partition"} + // CarbonPartitionSpec {("a=2","b=moon"), "path/to/partition"} + def collectPartitionSpecList( + sparkSession: SparkSession, + tablePath: String, + inputPath: String, + partitionPaths: Seq[String] + ): Seq[CarbonPartitionSpec] = { + partitionPaths.map { path => + try { + val partitionOnlyPath = path.substring(inputPath.length + 1) + val partitionColumnAndValue = partitionOnlyPath.split("/").toList.asJava + new CarbonPartitionSpec(partitionColumnAndValue, path) + } catch { + case t: Throwable => throw new RuntimeException(s"invalid partition path: $path") + } + } + } override protected def opName: String = "ADD SEGMENT WITH PATH" } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala index 26c0fb0..3191ba8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala @@ -19,14 +19,17 @@ package org.apache.spark.sql.execution.strategy import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{execution, MixedFormatHandlerUtil, SparkSession} +import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession} import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression} +import org.apache.spark.sql.execution.{FilterExec, ProjectExec} import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -56,22 +59,76 @@ object MixedFormatHandler { supportedFormats.exists(_.equalsIgnoreCase(format)) } - def getSchema(sparkSession: SparkSession, + /** + * collect schema, list of last level directory and list of all data files under given path + * + * @param sparkSession spark session + * @param options option for ADD SEGMENT + * @param inputPath under which path to collect + * @return schema of the data file, map of last level directory (partition folder) to its + * children file list (data files) + */ + def collectInfo( + sparkSession: SparkSession, options: Map[String, String], - segPath: String): StructType = { - val format = options.getOrElse("format", "carbondata") - if ((format.equalsIgnoreCase("carbondata") || format.equalsIgnoreCase("carbon"))) { - new SparkCarbonFileFormat().inferSchema(sparkSession, options, Seq.empty).get + inputPath: String): (StructType, mutable.Map[String, Seq[FileStatus]]) = { + val path = new Path(inputPath) + val fs = path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + val rootPath = fs.getFileStatus(path) + val leafDirFileMap = collectAllLeafFileStatus(sparkSession, rootPath, fs) + val format = options.getOrElse("format", "carbondata").toLowerCase + val fileFormat = if (format.equalsIgnoreCase("carbondata") || + format.equalsIgnoreCase("carbon")) { + new SparkCarbonFileFormat() } else { - val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", "/")) - val path = new Path(filePath) - val fs = path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) - val status = fs.listStatus(path, new PathFilter { - override def accept(path: Path): Boolean = { - !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc") - } - }) - getFileFormat(new FileFormatName(format)).inferSchema(sparkSession, options, status).get + getFileFormat(new FileFormatName(format)) + } + if (leafDirFileMap.isEmpty) { + throw new RuntimeException("no partition data is found") + } + val schema = fileFormat.inferSchema(sparkSession, options, leafDirFileMap.head._2).get + (schema, leafDirFileMap) + } + + /** + * collect leaf directories and leaf files recursively in given path + * + * @param sparkSession spark session + * @param path path to collect + * @param fs hadoop file system + * @return mapping of leaf directory to its children files + */ + private def collectAllLeafFileStatus( + sparkSession: SparkSession, + path: FileStatus, + fs: FileSystem): mutable.Map[String, Seq[FileStatus]] = { + val directories: ArrayBuffer[FileStatus] = ArrayBuffer() + val leafFiles: ArrayBuffer[FileStatus] = ArrayBuffer() + val lastLevelFileMap = mutable.Map[String, Seq[FileStatus]]() + + // get all files under input path + val fileStatus = fs.listStatus(path.getPath, new PathFilter { + override def accept(path: Path): Boolean = { + !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc") + } + }) + // collect directories and files + fileStatus.foreach { file => + if (file.isDirectory) directories.append(file) + else leafFiles.append(file) + } + if (leafFiles.nonEmpty) { + // leaf file is found, so parent folder (input parameter) is the last level dir + val updatedPath = FileFactory.getUpdatedFilePath(path.getPath.toString) + lastLevelFileMap.put(updatedPath, leafFiles) + lastLevelFileMap + } else { + // no leaf file is found, for each directory, collect recursively + directories.foreach { dir => + val map = collectAllLeafFileStatus(sparkSession, dir, fs) + lastLevelFileMap ++= map + } + lastLevelFileMap } } @@ -83,7 +140,8 @@ object MixedFormatHandler { * If multiple segments are with different formats like parquet , orc etc then it creates RDD for * each format segments and union them. */ - def extraRDD(l: LogicalRelation, + def extraRDD( + l: LogicalRelation, projects: Seq[NamedExpression], filters: Seq[Expression], readCommittedScope: ReadCommittedScope, @@ -99,13 +157,28 @@ object MixedFormatHandler { .filter(l => segsToAccess.isEmpty || segsToAccess.contains(l.getLoadName)) .groupBy(_.getFileFormat) .map { case (format, detailses) => - val paths = detailses.flatMap { d => - SegmentFileStore.readSegmentFile(CarbonTablePath.getSegmentFilePath(readCommittedScope - .getFilePath, d.getSegmentFile)).getLocationMap.asScala.flatMap { case (p, f) => - f.getFiles.asScala.map { ef => - new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef) + // collect paths as input to scan RDD + val paths = detailses. flatMap { d => + val segmentFile = SegmentFileStore.readSegmentFile( + CarbonTablePath.getSegmentFilePath(readCommittedScope.getFilePath, d.getSegmentFile)) + + // If it is a partition table, the path to create RDD should be the root path of the + // partition folder (excluding the partition subfolder). + // If it is not a partition folder, collect all data file paths + if (segmentFile.getOptions.containsKey("partition")) { + val segmentPath = segmentFile.getOptions.get("path") + if (segmentPath == null) { + throw new RuntimeException("invalid segment file, 'path' option not found") + } + Seq(new Path(segmentPath)) + } else { + // If it is not a partition folder, collect all data file paths to create RDD + segmentFile.getLocationMap.asScala.flatMap { case (p, f) => + f.getFiles.asScala.map { ef => + new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef) + }.toSeq }.toSeq - }.toSeq + } } val fileFormat = getFileFormat(format, supportBatch) getRDDForExternalSegments(l, projects, filters, fileFormat, paths) @@ -125,7 +198,7 @@ object MixedFormatHandler { rdd = rdd.union(r._1) } } - Some(rdd, !rdds.exists(!_._2)) + Some(rdd, rdds.forall(_._2)) } } } else { @@ -178,10 +251,13 @@ object MixedFormatHandler { case Some(catalogTable) => val fileIndex = new InMemoryFileIndex(sparkSession, paths, catalogTable.storage.properties, None) + // exclude the partition in data schema + val dataSchema = catalogTable.schema.filterNot { column => + catalogTable.partitionColumnNames.contains(column.name)} HadoopFsRelation( fileIndex, catalogTable.partitionSchema, - catalogTable.schema, + new StructType(dataSchema.toArray), catalogTable.bucketSpec, fileFormat, catalogTable.storage.properties)(sparkSession) @@ -253,11 +329,11 @@ object MixedFormatHandler { dataFilters, l.catalogTable.map(_.identifier)) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) - val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) + val withFilter = afterScanFilter.map(FilterExec(_, scan)).getOrElse(scan) val withProjections = if (projects == withFilter.output) { withFilter } else { - execution.ProjectExec(projects, withFilter) + ProjectExec(projects, withFilter) } (withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession, outputSchema)) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 8670b13..82ea8f6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -483,7 +483,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } /** - * ALTER TABLE <db.tableName> ADD SEGMENT OPTIONS('path'='path','''key'='value') + * ALTER TABLE [dbName.]tableName ADD SEGMENT + * OPTIONS('path'='path','format'='format', ['partition'='schema list']) + * + * schema list format: column_name:data_type + * for example: 'partition'='a:int,b:string' */ protected lazy val addLoad: Parser[LogicalPlan] = ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~