http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 0298eea..cf22569 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -150,7 +150,7 @@ case class CarbonDropTableCommand( // delete table data only if it is not external table if (FileFactory.isFileExist(tablePath, fileType) && - !carbonTable.isExternalTable) { + !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) { val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala deleted file mode 100644 index 2eed988..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import java.io.File -import java.util -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.util.Random - -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.indexstore -import org.apache.carbondata.core.metadata.SegmentFileStore -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil} -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} -import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter -import org.apache.carbondata.hadoop.internal.ObjectArrayWritable -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} - -class CarbonFileFormat - extends FileFormat - with DataSourceRegister - with Logging -with Serializable { - - override def shortName(): String = "carbondata" - - override def inferSchema(sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - None - } - - SparkSession.getActiveSession.get.sessionState.conf.setConfString( - "spark.sql.sources.commitProtocolClass", - "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val conf = job.getConfiguration - conf.setClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - classOf[CarbonOutputCommitter], - classOf[CarbonOutputCommitter]) - conf.set("carbon.commit.protocol", "carbon.commit.protocol") - job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) - val table = CarbonEnv.getCarbonTable( - TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) - val model = new CarbonLoadModel - val carbonProperty = CarbonProperties.getInstance() - val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) - val tableProperties = table.getTableInfo.getFactTable.getTableProperties - optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) - val partitionStr = - table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map( - _.getColumnName.toLowerCase).mkString(",") - optionsFinal.put( - "fileheader", - dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr) - val optionsLocal = new mutable.HashMap[String, String]() - optionsLocal ++= options - optionsLocal += (("header", "false")) - new CarbonLoadModelBuilder(table).build( - optionsLocal.toMap.asJava, - optionsFinal, - model, - conf) - model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) - model.setDictionaryServerHost(options.getOrElse("dicthost", null)) - model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) - CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) - model.setPartitionLoad(true) - - val staticPartition = options.getOrElse("staticpartition", null) - if (staticPartition != null) { - conf.set("carbon.staticpartition", staticPartition) - } - // In case of update query there is chance to remove the older segments, so here we can set - // the to be deleted segments to mark as delete while updating tablestatus - val segemntsTobeDeleted = options.get("segmentsToBeDeleted") - if (segemntsTobeDeleted.isDefined) { - conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get) - } - - val currPartition = options.getOrElse("currentpartition", null) - if (currPartition != null) { - conf.set("carbon.currentpartition", currPartition) - } - // Update with the current in progress load. - val currEntry = options.getOrElse("currentloadentry", null) - if (currEntry != null) { - val loadEntry = - ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails] - val details = - SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath)) - model.setSegmentId(loadEntry.getLoadName) - model.setFactTimeStamp(loadEntry.getLoadStartTime) - val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava) - list.add(loadEntry) - model.setLoadMetadataDetails(list) - } - // Set the update timestamp if user sets in case of update query. It needs to be updated - // in load status update time - val updateTimeStamp = options.get("updatetimestamp") - if (updateTimeStamp.isDefined) { - conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) - } - CarbonTableOutputFormat.setLoadModel(conf, model) - - new OutputWriterFactory { - - /** - * counter used for generating task numbers. This is used to generate unique partition numbers - * in case of partitioning - */ - val counter = new AtomicLong() - val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]() - - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) - val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir - var storeLocation: Array[String] = Array[String]() - val isCarbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") - - - val taskNumber = generateTaskNumber(path, context, model.getSegmentId) - val tmpLocationSuffix = - File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber - if (isCarbonUseLocalDir) { - val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { - // use single dir - storeLocation = storeLocation :+ - (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix) - if (storeLocation == null || storeLocation.isEmpty) { - storeLocation = storeLocation :+ - (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) - } - } else { - // use all the yarn dirs - storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix) - } - } else { - storeLocation = - storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) - } - CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) - new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model) - } - - /** - * Generate taskid using the taskid of taskcontext and the path. It should be unique in case - * of partition tables. - */ - private def generateTaskNumber(path: String, - context: TaskAttemptContext, segmentId: String): String = { - var partitionNumber: java.lang.Long = taskIdMap.get(path) - if (partitionNumber == null) { - partitionNumber = counter.incrementAndGet() - // Generate taskid using the combination of taskid and partition number to make it unique. - taskIdMap.put(path, partitionNumber) - } - val taskID = context.getTaskAttemptID.getTaskID.getId - CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber) - } - - override def getFileExtension(context: TaskAttemptContext): String = { - CarbonTablePath.CARBON_DATA_EXT - } - - } - } -} - -case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) - extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { - override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, - absoluteDir: String, - ext: String): String = { - val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol") - if (carbonFlow != null) { - super.newTaskTempFile(taskContext, Some(absoluteDir), ext) - } else { - super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) - } - } -} - -/** - * It is a just class to make compile between spark 2.1 and 2.2 - */ -private trait AbstractCarbonOutputWriter { - def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - def writeInternal(row: InternalRow): Unit = { - writeCarbon(row) - } - def write(row: InternalRow): Unit = { - writeCarbon(row) - } - def writeCarbon(row: InternalRow): Unit -} - -private class CarbonOutputWriter(path: String, - context: TaskAttemptContext, - fieldTypes: Seq[DataType], - taskNo : String, - model: CarbonLoadModel) - extends OutputWriter with AbstractCarbonOutputWriter { - - val converter = new DataTypeConverterImpl - - val partitions = - getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName) - val staticPartition: util.HashMap[String, Boolean] = { - val staticPart = context.getConfiguration.get("carbon.staticpartition") - if (staticPart != null) { - ObjectSerializationUtil.convertStringToObject( - staticPart).asInstanceOf[util.HashMap[String, Boolean]] - } else { - null - } - } - lazy val currPartitions: util.List[indexstore.PartitionSpec] = { - val currParts = context.getConfiguration.get("carbon.currentpartition") - if (currParts != null) { - ObjectSerializationUtil.convertStringToObject( - currParts).asInstanceOf[util.List[indexstore.PartitionSpec]] - } else { - new util.ArrayList[indexstore.PartitionSpec]() - } - } - var (updatedPartitions, partitionData) = if (partitions.nonEmpty) { - val updatedPartitions = partitions.map(splitPartition) - (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) - } else { - (Map.empty[String, String].toArray, Array.empty) - } - - private def splitPartition(p: String) = { - val value = p.substring(p.indexOf("=") + 1, p.length) - val col = p.substring(0, p.indexOf("=")) - // NUll handling case. For null hive creates with this special name - if (value.equals("__HIVE_DEFAULT_PARTITION__")) { - (col, null) - // we should replace back the special string with empty value. - } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { - (col, "") - } else { - (col, value) - } - } - - lazy val writePath = { - val updatedPath = getPartitionPath(path, context, model) - // in case of partition location specified by user then search the partitions from the current - // partitions to get the corresponding partitions. - if (partitions.isEmpty) { - val writeSpec = new indexstore.PartitionSpec(null, updatedPath) - val index = currPartitions.indexOf(writeSpec) - if (index > -1) { - val spec = currPartitions.get(index) - updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray - partitionData = updatePartitions(updatedPartitions.map(_._2)) - } - } - updatedPath - } - - val writable = new ObjectArrayWritable - - private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = { - model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo - .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) => - - val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) { - DataTypes.INT - } else if (col.getDataType.equals(DataTypes.TIMESTAMP) || - col.getDataType.equals(DataTypes.DATE)) { - DataTypes.LONG - } else { - col.getDataType - } - if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) { - val converetedVal = - CarbonScalaUtil.convertStaticPartitions( - partitionData(index), - col, - model.getCarbonDataLoadSchema.getCarbonTable) - if (col.hasEncoding(Encoding.DICTIONARY)) { - converetedVal.toInt.asInstanceOf[AnyRef] - } else { - DataTypeUtil.getDataBasedOnDataType( - converetedVal, - dataType, - converter) - } - } else { - DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter) - } - }.toArray - } - - private val recordWriter: CarbonRecordWriter = { - context.getConfiguration.set("carbon.outputformat.taskno", taskNo) - context.getConfiguration.set("carbon.outputformat.writepath", - writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp") - new CarbonTableOutputFormat() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(path) - } - }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter] - } - - // TODO Implement writesupport interface to support writing Row directly to recordwriter - def writeCarbon(row: InternalRow): Unit = { - val data = new Array[AnyRef](fieldTypes.length + partitionData.length) - var i = 0 - while (i < fieldTypes.length) { - if (!row.isNullAt(i)) { - fieldTypes(i) match { - case StringType => - data(i) = row.getString(i) - case d: DecimalType => - data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal - case other => - data(i) = row.get(i, other) - } - } - i += 1 - } - if (partitionData.length > 0) { - System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length) - } - writable.set(data) - recordWriter.write(NullWritable.get(), writable) - } - - - override def writeInternal(row: InternalRow): Unit = { - writeCarbon(row) - } - - override def close(): Unit = { - recordWriter.close(context) - // write partition info to new file. - val partitonList = new util.ArrayList[String]() - val formattedPartitions = - // All dynamic partitions need to be converted to proper format - CarbonScalaUtil.updatePartitions( - updatedPartitions.toMap, - model.getCarbonDataLoadSchema.getCarbonTable) - formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) - SegmentFileStore.writeSegmentFile( - model.getTablePath, - taskNo, - writePath, - model.getSegmentId + "_" + model.getFactTimeStamp + "", - partitonList) - } - - def getPartitionPath(path: String, - attemptContext: TaskAttemptContext, - model: CarbonLoadModel): String = { - if (updatedPartitions.nonEmpty) { - val formattedPartitions = - // All dynamic partitions need to be converted to proper format - CarbonScalaUtil.updatePartitions( - updatedPartitions.toMap, - model.getCarbonDataLoadSchema.getCarbonTable) - val partitionstr = formattedPartitions.map{p => - ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2) - }.mkString(CarbonCommonConstants.FILE_SEPARATOR) - model.getCarbonDataLoadSchema.getCarbonTable.getTablePath + - CarbonCommonConstants.FILE_SEPARATOR + partitionstr - } else { - var updatedPath = FileFactory.getUpdatedFilePath(path) - updatedPath.substring(0, updatedPath.lastIndexOf("/")) - } - } - - def getPartitionsFromPath( - path: String, - attemptContext: TaskAttemptContext, - model: CarbonLoadModel): Array[String] = { - var attemptId = attemptContext.getTaskAttemptID.toString + "/" - if (path.indexOf(attemptId) > -1) { - val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) - if (str.length > 0) { - str.split("/") - } else { - Array.empty - } - } else { - Array.empty - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala new file mode 100644 index 0000000..fa54e0d --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +@InterfaceAudience.User +@InterfaceStability.Evolving +class SparkCarbonFileFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val filePaths = CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) + if (filePaths.size() == 0) { + throw new SparkException("CarbonData file is not present in the location mentioned in DDL") + } + val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0)) + val fileHeader = carbonHeaderReader.readHeader + val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader + .getColumn_schema + var colArray = ArrayBuffer[StructField]() + for (i <- 0 to table_columns.size() - 1) { + val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)) + colArray += (new StructField(col.getColumnName, + CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false)) + } + colArray.+:(Nil) + + Some(StructType(colArray)) + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new TextOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CarbonTablePath.CARBON_DATA_EXT + } + } + } + + override def shortName(): String = "Carbonfile" + + override def toString: String = "Carbonfile" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat] + + def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = { + val vectorizedReader = { + if (sparkSession.sqlContext.sparkSession.conf + .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) { + sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER) + } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) { + System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) + } else { + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + } + vectorizedReader.toBoolean + } + + + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + val conf = sparkSession.sessionState.conf + conf.wholeStageEnabled && + schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + + def createVectorizedCarbonRecordReader(queryModel: QueryModel, + inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = { + val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader" + try { + val cons = Class.forName(name).getDeclaredConstructors + cons.head.setAccessible(true) + cons.head.newInstance(queryModel, inputMetricsStats, enableBatch) + .asInstanceOf[RecordReader[Void, Object]] + } catch { + case e: Exception => + LOGGER.error(e) + null + } + } + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + + val filter : Option[Expression] = filters.flatMap { filter => + CarbonFilters.createCarbonFilter(dataSchema, filter) + }.reduceOption(new AndExpression(_, _)) + + val projection = requiredSchema.map(_.name).toArray + val carbonProjection = new CarbonProjection + projection.foreach(carbonProjection.addColumn) + + val conf = new Configuration() + val jobConf = new JobConf(conf) + SparkHadoopUtil.get.addCredentials(jobConf) + val job = Job.getInstance(jobConf) + var supportBatchValue: Boolean = false + + val readVector = supportVector(sparkSession, dataSchema) + if (readVector) { + supportBatchValue = supportBatch(sparkSession, dataSchema) + } + + CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy") + CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default") + CarbonMetadata.getInstance.removeTable("default_externaldummy") + val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration) + val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object] + + (file: PartitionedFile) => { + assert(file.partitionValues.numFields == partitionSchema.size) + + if (file.filePath.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { + val fileSplit = + new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) + + val path: String = options.get("path").get + val endindex: Int = path.indexOf("Fact") - 1 + val tablePath = path.substring(0, endindex) + lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( + tablePath, + "default", + "externaldummy") + val split = CarbonInputSplit.from("null", "0", fileSplit, ColumnarFormatVersion.V3, null) + + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val conf1 = new Configuration() + conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy") + conf1.set("mapreduce.input.carboninputformat.databaseName", "default") + conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath) + CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection) + filter match { + case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c) + case None => None + } + val attemptContext = new TaskAttemptContextImpl(conf1, attemptId) + + val model = format.createQueryModel(split, attemptContext) + + var segments = new java.util.ArrayList[Segment]() + val seg = new Segment("null", null) + segments.add(seg) + var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]() + + + val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null") + val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath) + if (indexFiles.size() == 0) { + throw new SparkException("Index file not present to read the carbondata file") + } + + val tab = model.getTable + DataMapStoreManager.getInstance().clearDataMaps(identifier) + val dataMapExprWrapper = DataMapChooser.get + .choose(tab, model.getFilterExpressionResolverTree) + + // TODO : handle the partition for CarbonFileLevelFormat + val prunedBlocklets = dataMapExprWrapper.prune(segments, null) + + val detailInfo = prunedBlocklets.get(0).getDetailInfo + detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary) + split.setDetailInfo(detailInfo) + + val carbonReader = if (readVector) { + val vectorizedReader = createVectorizedCarbonRecordReader(model, + null, + supportBatchValue.toString) + vectorizedReader.initialize(split, attemptContext) + logDebug(s"Appending $partitionSchema ${ file.partitionValues }") + vectorizedReader + } else { + val reader = new CarbonRecordReader(model, + format.getReadSupportClass(attemptContext.getConfiguration), null) + reader.initialize(split, attemptContext) + reader + } + + val iter = new RecordReaderIterator(carbonReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + + iter.asInstanceOf[Iterator[InternalRow]] + } + else { + Iterator.empty + } + } + } +} + + http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala new file mode 100644 index 0000000..d34b201 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} + +class SparkCarbonTableFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + SparkSession.getActiveSession.get.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + val table = CarbonEnv.getCarbonTable( + TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) + val model = new CarbonLoadModel + val carbonProperty = CarbonProperties.getInstance() + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) + val tableProperties = table.getTableInfo.getFactTable.getTableProperties + optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + val partitionStr = + table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map( + _.getColumnName.toLowerCase).mkString(",") + optionsFinal.put( + "fileheader", + dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr) + val optionsLocal = new mutable.HashMap[String, String]() + optionsLocal ++= options + optionsLocal += (("header", "false")) + new CarbonLoadModelBuilder(table).build( + optionsLocal.toMap.asJava, + optionsFinal, + model, + conf) + model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) + model.setDictionaryServerHost(options.getOrElse("dicthost", null)) + model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) + CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) + model.setPartitionLoad(true) + + val staticPartition = options.getOrElse("staticpartition", null) + if (staticPartition != null) { + conf.set("carbon.staticpartition", staticPartition) + } + // In case of update query there is chance to remove the older segments, so here we can set + // the to be deleted segments to mark as delete while updating tablestatus + val segemntsTobeDeleted = options.get("segmentsToBeDeleted") + if (segemntsTobeDeleted.isDefined) { + conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get) + } + + val currPartition = options.getOrElse("currentpartition", null) + if (currPartition != null) { + conf.set("carbon.currentpartition", currPartition) + } + // Update with the current in progress load. + val currEntry = options.getOrElse("currentloadentry", null) + if (currEntry != null) { + val loadEntry = + ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails] + val details = + SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath)) + model.setSegmentId(loadEntry.getLoadName) + model.setFactTimeStamp(loadEntry.getLoadStartTime) + val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava) + list.add(loadEntry) + model.setLoadMetadataDetails(list) + } + // Set the update timestamp if user sets in case of update query. It needs to be updated + // in load status update time + val updateTimeStamp = options.get("updatetimestamp") + if (updateTimeStamp.isDefined) { + conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) + } + CarbonTableOutputFormat.setLoadModel(conf, model) + + new OutputWriterFactory { + + /** + * counter used for generating task numbers. This is used to generate unique partition numbers + * in case of partitioning + */ + val counter = new AtomicLong() + val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]() + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) + val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir + var storeLocation: Array[String] = Array[String]() + val isCarbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") + + + val taskNumber = generateTaskNumber(path, context, model.getSegmentId) + val tmpLocationSuffix = + File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber + if (isCarbonUseLocalDir) { + val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { + // use single dir + storeLocation = storeLocation :+ + (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix) + if (storeLocation == null || storeLocation.isEmpty) { + storeLocation = storeLocation :+ + (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + } else { + // use all the yarn dirs + storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix) + } + } else { + storeLocation = + storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) + new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model) + } + + /** + * Generate taskid using the taskid of taskcontext and the path. It should be unique in case + * of partition tables. + */ + private def generateTaskNumber(path: String, + context: TaskAttemptContext, segmentId: String): String = { + var partitionNumber: java.lang.Long = taskIdMap.get(path) + if (partitionNumber == null) { + partitionNumber = counter.incrementAndGet() + // Generate taskid using the combination of taskid and partition number to make it unique. + taskIdMap.put(path, partitionNumber) + } + val taskID = context.getTaskAttemptID.getTaskID.getId + CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CarbonTablePath.CARBON_DATA_EXT + } + + } + } +} + +case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { + override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, + absoluteDir: String, + ext: String): String = { + val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol") + if (carbonFlow != null) { + super.newTaskTempFile(taskContext, Some(absoluteDir), ext) + } else { + super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) + } + } +} + +/** + * It is a just class to make compile between spark 2.1 and 2.2 + */ +private trait AbstractCarbonOutputWriter { + def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") + def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + def write(row: InternalRow): Unit = { + writeCarbon(row) + } + def writeCarbon(row: InternalRow): Unit +} + +private class CarbonOutputWriter(path: String, + context: TaskAttemptContext, + fieldTypes: Seq[DataType], + taskNo : String, + model: CarbonLoadModel) + extends OutputWriter with AbstractCarbonOutputWriter { + + val converter = new DataTypeConverterImpl + + val partitions = + getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName) + val staticPartition: util.HashMap[String, Boolean] = { + val staticPart = context.getConfiguration.get("carbon.staticpartition") + if (staticPart != null) { + ObjectSerializationUtil.convertStringToObject( + staticPart).asInstanceOf[util.HashMap[String, Boolean]] + } else { + null + } + } + lazy val currPartitions: util.List[indexstore.PartitionSpec] = { + val currParts = context.getConfiguration.get("carbon.currentpartition") + if (currParts != null) { + ObjectSerializationUtil.convertStringToObject( + currParts).asInstanceOf[util.List[indexstore.PartitionSpec]] + } else { + new util.ArrayList[indexstore.PartitionSpec]() + } + } + var (updatedPartitions, partitionData) = if (partitions.nonEmpty) { + val updatedPartitions = partitions.map(splitPartition) + (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) + } else { + (Map.empty[String, String].toArray, Array.empty) + } + + private def splitPartition(p: String) = { + val value = p.substring(p.indexOf("=") + 1, p.length) + val col = p.substring(0, p.indexOf("=")) + // NUll handling case. For null hive creates with this special name + if (value.equals("__HIVE_DEFAULT_PARTITION__")) { + (col, null) + // we should replace back the special string with empty value. + } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { + (col, "") + } else { + (col, value) + } + } + + lazy val writePath = { + val updatedPath = getPartitionPath(path, context, model) + // in case of partition location specified by user then search the partitions from the current + // partitions to get the corresponding partitions. + if (partitions.isEmpty) { + val writeSpec = new indexstore.PartitionSpec(null, updatedPath) + val index = currPartitions.indexOf(writeSpec) + if (index > -1) { + val spec = currPartitions.get(index) + updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray + partitionData = updatePartitions(updatedPartitions.map(_._2)) + } + } + updatedPath + } + + val writable = new ObjectArrayWritable + + private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = { + model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo + .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) => + + val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) { + DataTypes.INT + } else if (col.getDataType.equals(DataTypes.TIMESTAMP) || + col.getDataType.equals(DataTypes.DATE)) { + DataTypes.LONG + } else { + col.getDataType + } + if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) { + val converetedVal = + CarbonScalaUtil.convertStaticPartitions( + partitionData(index), + col, + model.getCarbonDataLoadSchema.getCarbonTable) + if (col.hasEncoding(Encoding.DICTIONARY)) { + converetedVal.toInt.asInstanceOf[AnyRef] + } else { + DataTypeUtil.getDataBasedOnDataType( + converetedVal, + dataType, + converter) + } + } else { + DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter) + } + }.toArray + } + + private val recordWriter: CarbonRecordWriter = { + context.getConfiguration.set("carbon.outputformat.taskno", taskNo) + context.getConfiguration.set("carbon.outputformat.writepath", + writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp") + new CarbonTableOutputFormat() { + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + new Path(path) + } + }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter] + } + + // TODO Implement writesupport interface to support writing Row directly to recordwriter + def writeCarbon(row: InternalRow): Unit = { + val data = new Array[AnyRef](fieldTypes.length + partitionData.length) + var i = 0 + while (i < fieldTypes.length) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case other => + data(i) = row.get(i, other) + } + } + i += 1 + } + if (partitionData.length > 0) { + System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length) + } + writable.set(data) + recordWriter.write(NullWritable.get(), writable) + } + + + override def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + + override def close(): Unit = { + recordWriter.close(context) + // write partition info to new file. + val partitonList = new util.ArrayList[String]() + val formattedPartitions = + // All dynamic partitions need to be converted to proper format + CarbonScalaUtil.updatePartitions( + updatedPartitions.toMap, + model.getCarbonDataLoadSchema.getCarbonTable) + formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) + SegmentFileStore.writeSegmentFile( + model.getTablePath, + taskNo, + writePath, + model.getSegmentId + "_" + model.getFactTimeStamp + "", + partitonList) + } + + def getPartitionPath(path: String, + attemptContext: TaskAttemptContext, + model: CarbonLoadModel): String = { + if (updatedPartitions.nonEmpty) { + val formattedPartitions = + // All dynamic partitions need to be converted to proper format + CarbonScalaUtil.updatePartitions( + updatedPartitions.toMap, + model.getCarbonDataLoadSchema.getCarbonTable) + val partitionstr = formattedPartitions.map{p => + ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2) + }.mkString(CarbonCommonConstants.FILE_SEPARATOR) + model.getCarbonDataLoadSchema.getCarbonTable.getTablePath + + CarbonCommonConstants.FILE_SEPARATOR + partitionstr + } else { + var updatedPath = FileFactory.getUpdatedFilePath(path) + updatedPath.substring(0, updatedPath.lastIndexOf("/")) + } + } + + def getPartitionsFromPath( + path: String, + attemptContext: TaskAttemptContext, + model: CarbonLoadModel): Array[String] = { + var attemptId = attemptContext.getTaskAttemptID.toString + "/" + if (path.indexOf(attemptId) > -1) { + val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) + if (str.length > 0) { + str.split("/") + } else { + Array.empty + } + } else { + Array.empty + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index ec20ec2..d85ef68 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -110,7 +110,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName, alterTableChangeDataTypeModel.databaseName))(sparkSession) if (isCarbonTable) { - ExecutedCommandExec(dataTypeChange) :: Nil + val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName, + alterTableChangeDataTypeModel.tableName)(sparkSession) + if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + throw new MalformedCarbonCommandException( + "Unsupported alter operation on Carbon external fileformat table") + } else { + ExecutedCommandExec(dataTypeChange) :: Nil + } } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } @@ -119,7 +126,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName, alterTableAddColumnsModel.databaseName))(sparkSession) if (isCarbonTable) { - ExecutedCommandExec(addColumn) :: Nil + val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName, + alterTableAddColumnsModel.tableName)(sparkSession) + if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + throw new MalformedCarbonCommandException( + "Unsupported alter operation on Carbon external fileformat table") + } else { + ExecutedCommandExec(addColumn) :: Nil + } } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } @@ -128,7 +142,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { .tableExists(TableIdentifier(alterTableDropColumnModel.tableName, alterTableDropColumnModel.databaseName))(sparkSession) if (isCarbonTable) { - ExecutedCommandExec(dropColumn) :: Nil + val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName, + alterTableDropColumnModel.tableName)(sparkSession) + if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + throw new MalformedCarbonCommandException( + "Unsupported alter operation on Carbon external fileformat table") + } else { + ExecutedCommandExec(dropColumn) :: Nil + } } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 4996bec..b2f4505 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand -import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index e0fff08..69fd366 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -33,7 +33,9 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} @@ -144,19 +146,24 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, .getOrElse(Map.empty) } - def createCarbonTable(tableHeader: CreateTableHeaderContext, - skewSpecContext: SkewSpecContext, - bucketSpecContext: BucketSpecContext, - partitionColumns: ColTypeListContext, - columns : ColTypeListContext, - tablePropertyList : TablePropertyListContext, - locationSpecContext: SqlBaseParser.LocationSpecContext, - tableComment : Option[String], - ctas: TerminalNode, - query: QueryContext) : LogicalPlan = { + def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext, + BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext, + LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = { // val parser = new CarbonSpark2SqlParser + val (tableHeader, skewSpecContext, + bucketSpecContext, + partitionColumns, + columns, + tablePropertyList, + locationSpecContext, + tableComment, + ctas, + query, + provider) = createTableTuple + val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader) + // TODO: implement temporary tables if (temp) { throw new ParseException( @@ -256,13 +263,27 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession), tableIdentifier.table) val table = try { - SchemaReader.getTableInfo(identifier) - } catch { + val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) && + provider.equalsIgnoreCase("'Carbonfile'")) { + SchemaReader.inferSchema(identifier) + } + else { + SchemaReader.getTableInfo(identifier) + } + } + catch { case e: Throwable => operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader) } // set "_external" property, so that DROP TABLE will not delete the data - table.getFactTable.getTableProperties.put("_external", "true") + if (provider.equalsIgnoreCase("'Carbonfile'")) { + table.getFactTable.getTableProperties.put("_filelevelexternal", "true") + table.getFactTable.getTableProperties.put("_external", "false") + } else { + table.getFactTable.getTableProperties.put("_external", "true") + table.getFactTable.getTableProperties.put("_filelevelexternal", "false") + } table } else { // prepare table model of the collected tokens http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index ba2fe947..c6bab9e 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -326,18 +326,13 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'Carbonfile'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable( - tableHeader = ctx.createTableHeader, - skewSpecContext = ctx.skewSpec, - bucketSpecContext = ctx.bucketSpec, - partitionColumns = ctx.partitionColumns, - columns = ctx.columns, - tablePropertyList = ctx.tablePropertyList, - locationSpecContext = ctx.locationSpec(), - tableComment = Option(ctx.STRING()).map(string), - ctas = ctx.AS, - query = ctx.query) + val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec, + ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(), + Option(ctx.STRING()).map(string), + ctx.AS, ctx.query, fileStorage) + helper.createCarbonTable(createTableTuple) } else { super.visitCreateTable(ctx) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index f033a8e..c28e4ba 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -325,18 +325,12 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'Carbonfile'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable( - tableHeader = ctx.createTableHeader, - skewSpecContext = ctx.skewSpec, - bucketSpecContext = ctx.bucketSpec, - partitionColumns = ctx.partitionColumns, - columns = ctx.columns, - tablePropertyList = ctx.tablePropertyList, - locationSpecContext = ctx.locationSpec(), - tableComment = Option(ctx.STRING()).map(string), - ctas = ctx.AS, - query = ctx.query) + val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, + ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(), + Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage) + helper.createCarbonTable(createTableTuple) } else { super.visitCreateHiveTable(ctx) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---------------------------------------------------------------------- diff --git a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index d09c9b5..5831f3e 100644 --- a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -14,4 +14,5 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## ------------------------------------------------------------------------ -org.apache.spark.sql.CarbonSource \ No newline at end of file +org.apache.spark.sql.CarbonSource +org.apache.spark.sql.SparkCarbonFileFormat \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java index 0ac6f38..b15dafd 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -113,9 +114,12 @@ public class CSVCarbonWriterSuite { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); } writer.close(); - } catch (Exception e) { + } catch (IOException e) { e.printStackTrace(); Assert.fail(e.getMessage()); + } catch (InvalidLoadOptionException l) { + l.printStackTrace(); + Assert.fail(l.getMessage()); } File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));