http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 0298eea..cf22569 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -150,7 +150,7 @@ case class CarbonDropTableCommand(
 
       // delete table data only if it is not external table
       if (FileFactory.isFileExist(tablePath, fileType) &&
-          !carbonTable.isExternalTable) {
+          !(carbonTable.isExternalTable || 
carbonTable.isFileLevelExternalTable)) {
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
deleted file mode 100644
index 2eed988..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import java.io.File
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore
-import org.apache.carbondata.core.metadata.SegmentFileStore
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, 
DataTypeConverterImpl, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, 
CarbonTableOutputFormat}
-import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
-import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-
-class CarbonFileFormat
-  extends FileFormat
-    with DataSourceRegister
-    with Logging
-with Serializable {
-
-  override def shortName(): String = "carbondata"
-
-  override def inferSchema(sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    None
-  }
-
-  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
-    "spark.sql.sources.commitProtocolClass",
-    
"org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    val conf = job.getConfiguration
-    conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key,
-      classOf[CarbonOutputCommitter],
-      classOf[CarbonOutputCommitter])
-    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
-    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
-    val table = CarbonEnv.getCarbonTable(
-      TableIdentifier(options("tableName"), 
options.get("dbName")))(sparkSession)
-    val model = new CarbonLoadModel
-    val carbonProperty = CarbonProperties.getInstance()
-    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
-    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-    optionsFinal.put("sort_scope", 
tableProperties.asScala.getOrElse("sort_scope",
-      
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-    val partitionStr =
-      
table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
-        _.getColumnName.toLowerCase).mkString(",")
-    optionsFinal.put(
-      "fileheader",
-      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + 
partitionStr)
-    val optionsLocal = new mutable.HashMap[String, String]()
-    optionsLocal ++= options
-    optionsLocal += (("header", "false"))
-    new CarbonLoadModelBuilder(table).build(
-      optionsLocal.toMap.asJava,
-      optionsFinal,
-      model,
-      conf)
-    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
-    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
-    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
-    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    model.setPartitionLoad(true)
-
-    val staticPartition = options.getOrElse("staticpartition", null)
-    if (staticPartition != null) {
-      conf.set("carbon.staticpartition", staticPartition)
-    }
-    // In case of update query there is chance to remove the older segments, 
so here we can set
-    // the to be deleted segments to mark as delete while updating tablestatus
-    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
-    if (segemntsTobeDeleted.isDefined) {
-      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
segemntsTobeDeleted.get)
-    }
-
-    val currPartition = options.getOrElse("currentpartition", null)
-    if (currPartition != null) {
-      conf.set("carbon.currentpartition", currPartition)
-    }
-    // Update with the current in progress load.
-    val currEntry = options.getOrElse("currentloadentry", null)
-    if (currEntry != null) {
-      val loadEntry =
-        
ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
-      val details =
-        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
-      model.setSegmentId(loadEntry.getLoadName)
-      model.setFactTimeStamp(loadEntry.getLoadStartTime)
-      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
-      list.add(loadEntry)
-      model.setLoadMetadataDetails(list)
-    }
-    // Set the update timestamp if user sets in case of update query. It needs 
to be updated
-    // in load status update time
-    val updateTimeStamp = options.get("updatetimestamp")
-    if (updateTimeStamp.isDefined) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
-    }
-    CarbonTableOutputFormat.setLoadModel(conf, model)
-
-    new OutputWriterFactory {
-
-      /**
-       * counter used for generating task numbers. This is used to generate 
unique partition numbers
-       * in case of partitioning
-       */
-      val counter = new AtomicLong()
-      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
-
-      override def newInstance(
-          path: String,
-          dataSchema: StructType,
-          context: TaskAttemptContext): OutputWriter = {
-        val model = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
-        val isCarbonUseMultiDir = 
CarbonProperties.getInstance().isUseMultiTempDir
-        var storeLocation: Array[String] = Array[String]()
-        val isCarbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", 
"false").equalsIgnoreCase("true")
-
-
-        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
-        val tmpLocationSuffix =
-          File.separator + "carbon" + System.nanoTime() + File.separator + 
taskNumber
-        if (isCarbonUseLocalDir) {
-          val yarnStoreLocations = 
Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (!isCarbonUseMultiDir && null != yarnStoreLocations && 
yarnStoreLocations.nonEmpty) {
-            // use single dir
-            storeLocation = storeLocation :+
-              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + 
tmpLocationSuffix)
-            if (storeLocation == null || storeLocation.isEmpty) {
-              storeLocation = storeLocation :+
-                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-            }
-          } else {
-            // use all the yarn dirs
-            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-          }
-        } else {
-          storeLocation =
-            storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
-        }
-        
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, 
storeLocation)
-        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), 
taskNumber, model)
-      }
-
-      /**
-       * Generate taskid using the taskid of taskcontext and the path. It 
should be unique in case
-       * of partition tables.
-       */
-      private def generateTaskNumber(path: String,
-          context: TaskAttemptContext, segmentId: String): String = {
-        var partitionNumber: java.lang.Long = taskIdMap.get(path)
-        if (partitionNumber == null) {
-          partitionNumber = counter.incrementAndGet()
-          // Generate taskid using the combination of taskid and partition 
number to make it unique.
-          taskIdMap.put(path, partitionNumber)
-        }
-        val taskID = context.getTaskAttemptID.getTaskID.getId
-        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, 
partitionNumber)
-      }
-
-      override def getFileExtension(context: TaskAttemptContext): String = {
-        CarbonTablePath.CARBON_DATA_EXT
-      }
-
-    }
-  }
-}
-
-case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, 
isAppend: Boolean)
-  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
-  override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
-      absoluteDir: String,
-      ext: String): String = {
-    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
-    if (carbonFlow != null) {
-      super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
-    } else {
-      super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
-    }
-  }
-}
-
-/**
- * It is a just class to make compile between spark 2.1 and 2.2
- */
-private trait AbstractCarbonOutputWriter {
-  def write(row: Row): Unit = throw new UnsupportedOperationException("call 
writeInternal")
-  def writeInternal(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-  def write(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-  def writeCarbon(row: InternalRow): Unit
-}
-
-private class CarbonOutputWriter(path: String,
-    context: TaskAttemptContext,
-    fieldTypes: Seq[DataType],
-    taskNo : String,
-    model: CarbonLoadModel)
-  extends OutputWriter with AbstractCarbonOutputWriter {
-
-  val converter = new DataTypeConverterImpl
-
-  val partitions =
-    getPartitionsFromPath(path, context, 
model).map(ExternalCatalogUtils.unescapePathName)
-  val staticPartition: util.HashMap[String, Boolean] = {
-    val staticPart = context.getConfiguration.get("carbon.staticpartition")
-    if (staticPart != null) {
-      ObjectSerializationUtil.convertStringToObject(
-        staticPart).asInstanceOf[util.HashMap[String, Boolean]]
-    } else {
-      null
-    }
-  }
-  lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
-    val currParts = context.getConfiguration.get("carbon.currentpartition")
-    if (currParts != null) {
-      ObjectSerializationUtil.convertStringToObject(
-        currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
-    } else {
-      new util.ArrayList[indexstore.PartitionSpec]()
-    }
-  }
-  var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
-    val updatedPartitions = partitions.map(splitPartition)
-    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
-  } else {
-    (Map.empty[String, String].toArray, Array.empty)
-  }
-
-  private def splitPartition(p: String) = {
-    val value = p.substring(p.indexOf("=") + 1, p.length)
-    val col = p.substring(0, p.indexOf("="))
-    // NUll handling case. For null hive creates with this special name
-    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
-      (col, null)
-      // we should replace back the special string with empty value.
-    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      (col, "")
-    } else {
-      (col, value)
-    }
-  }
-
-  lazy val writePath = {
-    val updatedPath = getPartitionPath(path, context, model)
-    // in case of partition location specified by user then search the 
partitions from the current
-    // partitions to get the corresponding partitions.
-    if (partitions.isEmpty) {
-      val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
-      val index = currPartitions.indexOf(writeSpec)
-      if (index > -1) {
-        val spec = currPartitions.get(index)
-        updatedPartitions = 
spec.getPartitions.asScala.map(splitPartition).toArray
-        partitionData = updatePartitions(updatedPartitions.map(_._2))
-      }
-    }
-    updatedPath
-  }
-
-  val writable = new ObjectArrayWritable
-
-  private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
-    
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
-      .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
-
-      val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
-        DataTypes.INT
-      } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
-                         col.getDataType.equals(DataTypes.DATE)) {
-        DataTypes.LONG
-      } else {
-        col.getDataType
-      }
-      if (staticPartition != null && 
staticPartition.get(col.getColumnName.toLowerCase)) {
-        val converetedVal =
-          CarbonScalaUtil.convertStaticPartitions(
-            partitionData(index),
-            col,
-            model.getCarbonDataLoadSchema.getCarbonTable)
-        if (col.hasEncoding(Encoding.DICTIONARY)) {
-          converetedVal.toInt.asInstanceOf[AnyRef]
-        } else {
-          DataTypeUtil.getDataBasedOnDataType(
-            converetedVal,
-            dataType,
-            converter)
-        }
-      } else {
-        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, 
converter)
-      }
-    }.toArray
-  }
-
-  private val recordWriter: CarbonRecordWriter = {
-    context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
-    context.getConfiguration.set("carbon.outputformat.writepath",
-      writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + 
".tmp")
-    new CarbonTableOutputFormat() {
-      override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
-        new Path(path)
-      }
-    }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
-  }
-
-  // TODO Implement writesupport interface to support writing Row directly to 
recordwriter
-  def writeCarbon(row: InternalRow): Unit = {
-    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-    var i = 0
-    while (i < fieldTypes.length) {
-      if (!row.isNullAt(i)) {
-        fieldTypes(i) match {
-          case StringType =>
-            data(i) = row.getString(i)
-          case d: DecimalType =>
-            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-          case other =>
-            data(i) = row.get(i, other)
-        }
-      }
-      i += 1
-    }
-    if (partitionData.length > 0) {
-      System.arraycopy(partitionData, 0, data, fieldTypes.length, 
partitionData.length)
-    }
-    writable.set(data)
-    recordWriter.write(NullWritable.get(), writable)
-  }
-
-
-  override def writeInternal(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-
-  override def close(): Unit = {
-    recordWriter.close(context)
-    // write partition info to new file.
-    val partitonList = new util.ArrayList[String]()
-    val formattedPartitions =
-    // All dynamic partitions need to be converted to proper format
-      CarbonScalaUtil.updatePartitions(
-        updatedPartitions.toMap,
-        model.getCarbonDataLoadSchema.getCarbonTable)
-    formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
-    SegmentFileStore.writeSegmentFile(
-      model.getTablePath,
-      taskNo,
-      writePath,
-      model.getSegmentId + "_" + model.getFactTimeStamp + "",
-      partitonList)
-  }
-
-  def getPartitionPath(path: String,
-      attemptContext: TaskAttemptContext,
-      model: CarbonLoadModel): String = {
-    if (updatedPartitions.nonEmpty) {
-      val formattedPartitions =
-      // All dynamic partitions need to be converted to proper format
-        CarbonScalaUtil.updatePartitions(
-          updatedPartitions.toMap,
-          model.getCarbonDataLoadSchema.getCarbonTable)
-      val partitionstr = formattedPartitions.map{p =>
-        ExternalCatalogUtils.escapePathName(p._1) + "=" + 
ExternalCatalogUtils.escapePathName(p._2)
-      }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
-      model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
-        CarbonCommonConstants.FILE_SEPARATOR + partitionstr
-    } else {
-      var updatedPath = FileFactory.getUpdatedFilePath(path)
-      updatedPath.substring(0, updatedPath.lastIndexOf("/"))
-    }
-  }
-
-  def getPartitionsFromPath(
-      path: String,
-      attemptContext: TaskAttemptContext,
-      model: CarbonLoadModel): Array[String] = {
-    var attemptId = attemptContext.getTaskAttemptID.toString + "/"
-    if (path.indexOf(attemptId) > -1) {
-      val str = path.substring(path.indexOf(attemptId) + attemptId.length, 
path.lastIndexOf("/"))
-      if (str.length > 0) {
-        str.split("/")
-      } else {
-        Array.empty
-      }
-    } else {
-      Array.empty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
new file mode 100644
index 0000000..fa54e0d
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapChooser, 
DataMapStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata, ColumnarFormatVersion}
+import org.apache.carbondata.core.reader.CarbonHeaderReader
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.scan.model.QueryModel
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, 
CarbonRecordReader, InputMetricsStats}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+  with DataSourceRegister
+  with Logging
+  with Serializable {
+
+  @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val filePaths = CarbonUtil.getFilePathExternalFilePath(
+      options.get("path").get)
+    if (filePaths.size() == 0) {
+      throw new SparkException("CarbonData file is not present in the location 
mentioned in DDL")
+    }
+    val carbonHeaderReader: CarbonHeaderReader = new 
CarbonHeaderReader(filePaths.get(0))
+    val fileHeader = carbonHeaderReader.readHeader
+    val table_columns: 
java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader
+      .getColumn_schema
+    var colArray = ArrayBuffer[StructField]()
+    for (i <- 0 to table_columns.size() - 1) {
+      val col = 
CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))
+      colArray += (new StructField(col.getColumnName,
+        CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false))
+    }
+    colArray.+:(Nil)
+
+    Some(StructType(colArray))
+  }
+
+  override def prepareWrite(sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new TextOutputWriter(path, dataSchema, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        CarbonTablePath.CARBON_DATA_EXT
+      }
+    }
+  }
+
+  override def shortName(): String = "Carbonfile"
+
+  override def toString: String = "Carbonfile"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = 
other.isInstanceOf[SparkCarbonFileFormat]
+
+  def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = 
{
+    val vectorizedReader = {
+      if (sparkSession.sqlContext.sparkSession.conf
+        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+        
sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else if 
(System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else {
+        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      }
+    }
+    vectorizedReader.toBoolean
+  }
+
+
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+    val conf = sparkSession.sessionState.conf
+    conf.wholeStageEnabled &&
+    schema.length <= conf.wholeStageMaxNumFields &&
+    schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+
+  def createVectorizedCarbonRecordReader(queryModel: QueryModel,
+      inputMetricsStats: InputMetricsStats, enableBatch: String): 
RecordReader[Void, Object] = {
+    val name = 
"org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
+    try {
+      val cons = Class.forName(name).getDeclaredConstructors
+      cons.head.setAccessible(true)
+      cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+        .asInstanceOf[RecordReader[Void, Object]]
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e)
+        null
+    }
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+
+    val filter : Option[Expression] = filters.flatMap { filter =>
+      CarbonFilters.createCarbonFilter(dataSchema, filter)
+    }.reduceOption(new AndExpression(_, _))
+
+    val projection = requiredSchema.map(_.name).toArray
+    val carbonProjection = new CarbonProjection
+    projection.foreach(carbonProjection.addColumn)
+
+    val conf = new Configuration()
+    val jobConf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
+    var supportBatchValue: Boolean = false
+
+    val readVector = supportVector(sparkSession, dataSchema)
+    if (readVector) {
+      supportBatchValue = supportBatch(sparkSession, dataSchema)
+    }
+
+    CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy")
+    CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default")
+    CarbonMetadata.getInstance.removeTable("default_externaldummy")
+    val dataMapJob: DataMapJob = 
CarbonFileInputFormat.getDataMapJob(job.getConfiguration)
+    val format: CarbonFileInputFormat[Object] = new 
CarbonFileInputFormat[Object]
+
+    (file: PartitionedFile) => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      if (file.filePath.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+        val fileSplit =
+          new FileSplit(new Path(new URI(file.filePath)), file.start, 
file.length, Array.empty)
+
+        val path: String = options.get("path").get
+        val endindex: Int = path.indexOf("Fact") - 1
+        val tablePath = path.substring(0, endindex)
+        lazy val identifier: AbsoluteTableIdentifier = 
AbsoluteTableIdentifier.from(
+          tablePath,
+          "default",
+          "externaldummy")
+        val split = CarbonInputSplit.from("null", "0", fileSplit, 
ColumnarFormatVersion.V3, null)
+
+
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
+        val conf1 = new Configuration()
+        conf1.set("mapreduce.input.carboninputformat.tableName", 
"externaldummy")
+        conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
+        conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
+        CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection)
+        filter match {
+          case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c)
+          case None => None
+        }
+        val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)
+
+        val model = format.createQueryModel(split, attemptContext)
+
+        var segments = new java.util.ArrayList[Segment]()
+        val seg = new Segment("null", null)
+        segments.add(seg)
+        var partition : java.util.List[PartitionSpec] = new 
java.util.ArrayList[PartitionSpec]()
+
+
+        val segmentPath = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
+        val indexFiles = new 
SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
+        if (indexFiles.size() == 0) {
+          throw new SparkException("Index file not present to read the 
carbondata file")
+        }
+
+        val tab = model.getTable
+        DataMapStoreManager.getInstance().clearDataMaps(identifier)
+        val dataMapExprWrapper = DataMapChooser.get
+          .choose(tab, model.getFilterExpressionResolverTree)
+
+        // TODO : handle the partition for CarbonFileLevelFormat
+        val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
+
+        val detailInfo = prunedBlocklets.get(0).getDetailInfo
+        detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)
+        split.setDetailInfo(detailInfo)
+
+        val carbonReader = if (readVector) {
+          val vectorizedReader = createVectorizedCarbonRecordReader(model,
+            null,
+            supportBatchValue.toString)
+          vectorizedReader.initialize(split, attemptContext)
+          logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
+          vectorizedReader
+        } else {
+          val reader = new CarbonRecordReader(model,
+            format.getReadSupportClass(attemptContext.getConfiguration), null)
+          reader.initialize(split, attemptContext)
+          reader
+        }
+
+        val iter = new RecordReaderIterator(carbonReader)
+        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+
+        iter.asInstanceOf[Iterator[InternalRow]]
+      }
+      else {
+        Iterator.empty
+      }
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
new file mode 100644
index 0000000..d34b201
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
DataTypeConverterImpl, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, 
CarbonTableOutputFormat}
+import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+
+class SparkCarbonTableFormat
+  extends FileFormat
+    with DataSourceRegister
+    with Logging
+with Serializable {
+
+  override def shortName(): String = "carbondata"
+
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    None
+  }
+
+  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+    "spark.sql.sources.commitProtocolClass",
+    
"org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    conf.setClass(
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
+      classOf[CarbonOutputCommitter],
+      classOf[CarbonOutputCommitter])
+    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
+    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
+    val table = CarbonEnv.getCarbonTable(
+      TableIdentifier(options("tableName"), 
options.get("dbName")))(sparkSession)
+    val model = new CarbonLoadModel
+    val carbonProperty = CarbonProperties.getInstance()
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+    optionsFinal.put("sort_scope", 
tableProperties.asScala.getOrElse("sort_scope",
+      
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    val partitionStr =
+      
table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
+        _.getColumnName.toLowerCase).mkString(",")
+    optionsFinal.put(
+      "fileheader",
+      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + 
partitionStr)
+    val optionsLocal = new mutable.HashMap[String, String]()
+    optionsLocal ++= options
+    optionsLocal += (("header", "false"))
+    new CarbonLoadModelBuilder(table).build(
+      optionsLocal.toMap.asJava,
+      optionsFinal,
+      model,
+      conf)
+    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
+    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
+    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
+    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+    model.setPartitionLoad(true)
+
+    val staticPartition = options.getOrElse("staticpartition", null)
+    if (staticPartition != null) {
+      conf.set("carbon.staticpartition", staticPartition)
+    }
+    // In case of update query there is chance to remove the older segments, 
so here we can set
+    // the to be deleted segments to mark as delete while updating tablestatus
+    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
+    if (segemntsTobeDeleted.isDefined) {
+      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
segemntsTobeDeleted.get)
+    }
+
+    val currPartition = options.getOrElse("currentpartition", null)
+    if (currPartition != null) {
+      conf.set("carbon.currentpartition", currPartition)
+    }
+    // Update with the current in progress load.
+    val currEntry = options.getOrElse("currentloadentry", null)
+    if (currEntry != null) {
+      val loadEntry =
+        
ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
+      val details =
+        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+      model.setSegmentId(loadEntry.getLoadName)
+      model.setFactTimeStamp(loadEntry.getLoadStartTime)
+      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+      list.add(loadEntry)
+      model.setLoadMetadataDetails(list)
+    }
+    // Set the update timestamp if user sets in case of update query. It needs 
to be updated
+    // in load status update time
+    val updateTimeStamp = options.get("updatetimestamp")
+    if (updateTimeStamp.isDefined) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+    }
+    CarbonTableOutputFormat.setLoadModel(conf, model)
+
+    new OutputWriterFactory {
+
+      /**
+       * counter used for generating task numbers. This is used to generate 
unique partition numbers
+       * in case of partitioning
+       */
+      val counter = new AtomicLong()
+      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
+
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        val model = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+        val isCarbonUseMultiDir = 
CarbonProperties.getInstance().isUseMultiTempDir
+        var storeLocation: Array[String] = Array[String]()
+        val isCarbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", 
"false").equalsIgnoreCase("true")
+
+
+        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
+        val tmpLocationSuffix =
+          File.separator + "carbon" + System.nanoTime() + File.separator + 
taskNumber
+        if (isCarbonUseLocalDir) {
+          val yarnStoreLocations = 
Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (!isCarbonUseMultiDir && null != yarnStoreLocations && 
yarnStoreLocations.nonEmpty) {
+            // use single dir
+            storeLocation = storeLocation :+
+              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + 
tmpLocationSuffix)
+            if (storeLocation == null || storeLocation.isEmpty) {
+              storeLocation = storeLocation :+
+                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+            }
+          } else {
+            // use all the yarn dirs
+            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+          }
+        } else {
+          storeLocation =
+            storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
+        }
+        
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, 
storeLocation)
+        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), 
taskNumber, model)
+      }
+
+      /**
+       * Generate taskid using the taskid of taskcontext and the path. It 
should be unique in case
+       * of partition tables.
+       */
+      private def generateTaskNumber(path: String,
+          context: TaskAttemptContext, segmentId: String): String = {
+        var partitionNumber: java.lang.Long = taskIdMap.get(path)
+        if (partitionNumber == null) {
+          partitionNumber = counter.incrementAndGet()
+          // Generate taskid using the combination of taskid and partition 
number to make it unique.
+          taskIdMap.put(path, partitionNumber)
+        }
+        val taskID = context.getTaskAttemptID.getTaskID.getId
+        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, 
partitionNumber)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        CarbonTablePath.CARBON_DATA_EXT
+      }
+
+    }
+  }
+}
+
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, 
isAppend: Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+  override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
+      absoluteDir: String,
+      ext: String): String = {
+    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
+    if (carbonFlow != null) {
+      super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
+    } else {
+      super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    }
+  }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+private trait AbstractCarbonOutputWriter {
+  def write(row: Row): Unit = throw new UnsupportedOperationException("call 
writeInternal")
+  def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def write(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def writeCarbon(row: InternalRow): Unit
+}
+
+private class CarbonOutputWriter(path: String,
+    context: TaskAttemptContext,
+    fieldTypes: Seq[DataType],
+    taskNo : String,
+    model: CarbonLoadModel)
+  extends OutputWriter with AbstractCarbonOutputWriter {
+
+  val converter = new DataTypeConverterImpl
+
+  val partitions =
+    getPartitionsFromPath(path, context, 
model).map(ExternalCatalogUtils.unescapePathName)
+  val staticPartition: util.HashMap[String, Boolean] = {
+    val staticPart = context.getConfiguration.get("carbon.staticpartition")
+    if (staticPart != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        staticPart).asInstanceOf[util.HashMap[String, Boolean]]
+    } else {
+      null
+    }
+  }
+  lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
+    val currParts = context.getConfiguration.get("carbon.currentpartition")
+    if (currParts != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
+    } else {
+      new util.ArrayList[indexstore.PartitionSpec]()
+    }
+  }
+  var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+    val updatedPartitions = partitions.map(splitPartition)
+    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+  } else {
+    (Map.empty[String, String].toArray, Array.empty)
+  }
+
+  private def splitPartition(p: String) = {
+    val value = p.substring(p.indexOf("=") + 1, p.length)
+    val col = p.substring(0, p.indexOf("="))
+    // NUll handling case. For null hive creates with this special name
+    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+      (col, null)
+      // we should replace back the special string with empty value.
+    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      (col, "")
+    } else {
+      (col, value)
+    }
+  }
+
+  lazy val writePath = {
+    val updatedPath = getPartitionPath(path, context, model)
+    // in case of partition location specified by user then search the 
partitions from the current
+    // partitions to get the corresponding partitions.
+    if (partitions.isEmpty) {
+      val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
+      val index = currPartitions.indexOf(writeSpec)
+      if (index > -1) {
+        val spec = currPartitions.get(index)
+        updatedPartitions = 
spec.getPartitions.asScala.map(splitPartition).toArray
+        partitionData = updatePartitions(updatedPartitions.map(_._2))
+      }
+    }
+    updatedPath
+  }
+
+  val writable = new ObjectArrayWritable
+
+  private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
+    
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+      .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
+
+      val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
+        DataTypes.INT
+      } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
+                         col.getDataType.equals(DataTypes.DATE)) {
+        DataTypes.LONG
+      } else {
+        col.getDataType
+      }
+      if (staticPartition != null && 
staticPartition.get(col.getColumnName.toLowerCase)) {
+        val converetedVal =
+          CarbonScalaUtil.convertStaticPartitions(
+            partitionData(index),
+            col,
+            model.getCarbonDataLoadSchema.getCarbonTable)
+        if (col.hasEncoding(Encoding.DICTIONARY)) {
+          converetedVal.toInt.asInstanceOf[AnyRef]
+        } else {
+          DataTypeUtil.getDataBasedOnDataType(
+            converetedVal,
+            dataType,
+            converter)
+        }
+      } else {
+        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, 
converter)
+      }
+    }.toArray
+  }
+
+  private val recordWriter: CarbonRecordWriter = {
+    context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
+    context.getConfiguration.set("carbon.outputformat.writepath",
+      writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + 
".tmp")
+    new CarbonTableOutputFormat() {
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
+        new Path(path)
+      }
+    }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
+  }
+
+  // TODO Implement writesupport interface to support writing Row directly to 
recordwriter
+  def writeCarbon(row: InternalRow): Unit = {
+    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
+    var i = 0
+    while (i < fieldTypes.length) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    if (partitionData.length > 0) {
+      System.arraycopy(partitionData, 0, data, fieldTypes.length, 
partitionData.length)
+    }
+    writable.set(data)
+    recordWriter.write(NullWritable.get(), writable)
+  }
+
+
+  override def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+
+  override def close(): Unit = {
+    recordWriter.close(context)
+    // write partition info to new file.
+    val partitonList = new util.ArrayList[String]()
+    val formattedPartitions =
+    // All dynamic partitions need to be converted to proper format
+      CarbonScalaUtil.updatePartitions(
+        updatedPartitions.toMap,
+        model.getCarbonDataLoadSchema.getCarbonTable)
+    formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
+    SegmentFileStore.writeSegmentFile(
+      model.getTablePath,
+      taskNo,
+      writePath,
+      model.getSegmentId + "_" + model.getFactTimeStamp + "",
+      partitonList)
+  }
+
+  def getPartitionPath(path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): String = {
+    if (updatedPartitions.nonEmpty) {
+      val formattedPartitions =
+      // All dynamic partitions need to be converted to proper format
+        CarbonScalaUtil.updatePartitions(
+          updatedPartitions.toMap,
+          model.getCarbonDataLoadSchema.getCarbonTable)
+      val partitionstr = formattedPartitions.map{p =>
+        ExternalCatalogUtils.escapePathName(p._1) + "=" + 
ExternalCatalogUtils.escapePathName(p._2)
+      }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
+      model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
+        CarbonCommonConstants.FILE_SEPARATOR + partitionstr
+    } else {
+      var updatedPath = FileFactory.getUpdatedFilePath(path)
+      updatedPath.substring(0, updatedPath.lastIndexOf("/"))
+    }
+  }
+
+  def getPartitionsFromPath(
+      path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): Array[String] = {
+    var attemptId = attemptContext.getTaskAttemptID.toString + "/"
+    if (path.indexOf(attemptId) > -1) {
+      val str = path.substring(path.indexOf(attemptId) + attemptId.length, 
path.lastIndexOf("/"))
+      if (str.length > 0) {
+        str.split("/")
+      } else {
+        Array.empty
+      }
+    } else {
+      Array.empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ec20ec2..d85ef68 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -110,7 +110,14 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
             alterTableChangeDataTypeModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(dataTypeChange) :: Nil
+          val carbonTable = 
CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
+            alterTableChangeDataTypeModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat 
table")
+          } else {
+            ExecutedCommandExec(dataTypeChange) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter 
operation on hive table")
         }
@@ -119,7 +126,14 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
             alterTableAddColumnsModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(addColumn) :: Nil
+          val carbonTable = 
CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
+            alterTableAddColumnsModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat 
table")
+          } else {
+            ExecutedCommandExec(addColumn) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter 
operation on hive table")
         }
@@ -128,7 +142,14 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
             alterTableDropColumnModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(dropColumn) :: Nil
+          val carbonTable = 
CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
+            alterTableDropColumnModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat 
table")
+          } else {
+            ExecutedCommandExec(dropColumn) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter 
operation on hive table")
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 4996bec..b2f4505 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import 
org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, 
CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index e0fff08..69fd366 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -144,19 +146,24 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
       .getOrElse(Map.empty)
   }
 
-  def createCarbonTable(tableHeader: CreateTableHeaderContext,
-      skewSpecContext: SkewSpecContext,
-      bucketSpecContext: BucketSpecContext,
-      partitionColumns: ColTypeListContext,
-      columns : ColTypeListContext,
-      tablePropertyList : TablePropertyListContext,
-      locationSpecContext: SqlBaseParser.LocationSpecContext,
-      tableComment : Option[String],
-      ctas: TerminalNode,
-      query: QueryContext) : LogicalPlan = {
+  def createCarbonTable(createTableTuple: (CreateTableHeaderContext, 
SkewSpecContext,
+    BucketSpecContext, ColTypeListContext, ColTypeListContext, 
TablePropertyListContext,
+    LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): 
LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
+    val (tableHeader, skewSpecContext,
+      bucketSpecContext,
+      partitionColumns,
+      columns,
+      tablePropertyList,
+      locationSpecContext,
+      tableComment,
+      ctas,
+      query,
+      provider) = createTableTuple
+
     val (tableIdentifier, temp, ifNotExists, external) = 
visitCreateTableHeader(tableHeader)
+
     // TODO: implement temporary tables
     if (temp) {
       throw new ParseException(
@@ -256,13 +263,27 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
         tableIdentifier.table)
       val table = try {
-        SchemaReader.getTableInfo(identifier)
-      } catch {
+        val schemaPath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
+        if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath)) &&
+            provider.equalsIgnoreCase("'Carbonfile'")) {
+          SchemaReader.inferSchema(identifier)
+        }
+        else {
+          SchemaReader.getTableInfo(identifier)
+        }
+      }
+        catch {
         case e: Throwable =>
           operationNotAllowed(s"Invalid table path provided: ${tablePath.get} 
", tableHeader)
       }
       // set "_external" property, so that DROP TABLE will not delete the data
-      table.getFactTable.getTableProperties.put("_external", "true")
+      if (provider.equalsIgnoreCase("'Carbonfile'")) {
+        table.getFactTable.getTableProperties.put("_filelevelexternal", "true")
+        table.getFactTable.getTableProperties.put("_external", "false")
+      } else {
+        table.getFactTable.getTableProperties.put("_external", "true")
+        table.getFactTable.getTableProperties.put("_filelevelexternal", 
"false")
+      }
       table
     } else {
       // prepare table model of the collected tokens

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index ba2fe947..c6bab9e 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -326,18 +326,13 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: 
CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, 
ctx.bucketSpec,
+        ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, 
ctx.locationSpec(),
+        Option(ctx.STRING()).map(string),
+        ctx.AS, ctx.query, fileStorage)
+        helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index f033a8e..c28e4ba 100644
--- 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -325,18 +325,12 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: 
CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, 
ctx.tablePropertyList,ctx.locationSpec(),
+        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
+      helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index d09c9b5..5831f3e 100644
--- 
a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -14,4 +14,5 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ------------------------------------------------------------------------
-org.apache.spark.sql.CarbonSource
\ No newline at end of file
+org.apache.spark.sql.CarbonSource
+org.apache.spark.sql.SparkCarbonFileFormat
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 0ac6f38..b15dafd 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -113,9 +114,12 @@ public class CSVCarbonWriterSuite {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf((double) i / 2)});
       }
       writer.close();
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
     }
 
     File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));

Reply via email to