Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212329045
  
    --- Diff: 
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
 ---
    @@ -0,0 +1,400 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce._
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.memory.MemoryMode
    +import org.apache.spark.sql._
    +import 
org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.JoinedRow
    +import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.execution.datasources._
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.types._
    +import org.apache.spark.sql.util.SparkTypeConverter
    +import org.apache.spark.util.SerializableConfiguration
    +
    +import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.converter.SparkDataTypeConverterImpl
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.indexstore.BlockletDetailInfo
    +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
ColumnarFormatVersion}
    +import org.apache.carbondata.core.metadata.schema.SchemaReader
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.expression.{Expression => 
CarbonExpression}
    +import org.apache.carbondata.core.scan.expression.logical.AndExpression
    +import org.apache.carbondata.core.statusmanager.{FileFormat => 
CarbonFileFormatVersion}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, 
CarbonRecordReader}
    +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
    +import 
org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, 
StructObject}
    +import 
org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
    +
    +/**
    + * Used to read and write data stored in carbondata files to/from the 
spark execution engine.
    + */
    +@InterfaceAudience.User
    +@InterfaceStability.Evolving
    +class SparkCarbonFileFormat extends FileFormat
    +  with DataSourceRegister
    +  with Logging
    +  with Serializable {
    +
    +  @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  /**
    +   * If user does not provide schema while reading the data then spark 
calls this method to infer
    +   * schema from the carbodata files. It reads the schema present in 
carbondata files and return it.
    +   */
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    val tablePath = options.get("path") match {
    +      case Some(path) => path
    +      case _ => 
FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
    +    }
    +
    +    val tableInfo = 
SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
    +    val table = CarbonTable.buildFromTableInfo(tableInfo)
    +    var schema = new StructType
    +    tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
    +      // TODO find better way to know its a child
    +      if (!col.getColumnName.contains(".")) {
    +        schema = schema.add(
    +          col.getColumnName,
    +          SparkTypeConverter.convertCarbonToSparkDataType(col, table))
    +      }
    +    }
    +    Some(schema)
    +  }
    +
    +
    +  /**
    +   * Prepares a write job and returns an [[OutputWriterFactory]].  Client 
side job preparation is
    +   * done here.
    +   */
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    val conf = job.getConfiguration
    +
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, 
dataSchema)
    +    model.setLoadWithoutConverterStep(true)
    +    CarbonTableOutputFormat.setLoadModel(conf, model)
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        val updatedPath = if 
(path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
    +          new Path(path).getParent.toString
    +        } else {
    +          path
    +        }
    +        context.getConfiguration.set("carbon.outputformat.writepath", 
updatedPath)
    +        context.getConfiguration.set("carbon.outputformat.taskno", 
System.nanoTime() + "")
    +        new CarbonOutputWriter(path, context, dataSchema.fields)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = 
{
    +        CarbonTablePath.CARBON_DATA_EXT
    +      }
    +    }
    +  }
    +
    +  /**
    +   * It is a just class to make compile between spark 2.1 and 2.2
    +   */
    +  private trait AbstractCarbonOutputWriter {
    +    def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
    +    def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def write(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def writeCarbon(row: InternalRow): Unit
    +  }
    +
    +
    +  /**
    +   * Writer class for carbondata files
    +   */
    +  private class CarbonOutputWriter(path: String,
    +      context: TaskAttemptContext,
    +      fieldTypes: Array[StructField]) extends OutputWriter with 
AbstractCarbonOutputWriter {
    +
    +    private val writable = new ObjectArrayWritable
    +
    +    private val cutOffDate = Integer.MAX_VALUE >> 1
    +
    +    private val recordWriter: RecordWriter[NullWritable, 
ObjectArrayWritable] =
    +      new CarbonTableOutputFormat().getRecordWriter(context)
    +
    +    /**
    +     * Write sparks internal row to carbondata record writer
    +     */
    +    def writeCarbon(row: InternalRow): Unit = {
    +      val data: Array[AnyRef] = extractData(row, fieldTypes)
    +      writable.set(data)
    +      recordWriter.write(NullWritable.get(), writable)
    +    }
    +
    +    override def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: InternalRow, fieldTypes: 
Array[StructField]): Array[AnyRef] = {
    +      val data = new Array[AnyRef](fieldTypes.length)
    +      var i = 0
    +      while (i < fieldTypes.length) {
    +        if (!row.isNullAt(i)) {
    +          fieldTypes(i).dataType match {
    +            case StringType =>
    +              data(i) = row.getString(i)
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, 
d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, 
s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), 
s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case d: TimestampType =>
    +              data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
    +            case other =>
    +              data(i) = row.get(i, other)
    +          }
    +        } else {
    +          setNull(fieldTypes(i).dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = 
{
    +      dataType match {
    +        case d: DateType =>
    +          // 1  as treated as null in carbon
    +          data(i) = 1.asInstanceOf[AnyRef]
    +        case _ =>
    +      }
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: ArrayData, dataType: DataType): 
Array[AnyRef] = {
    +      val data = new Array[AnyRef](row.numElements())
    +      var i = 0
    +      while (i < data.length) {
    +        if (!row.isNullAt(i)) {
    +          dataType match {
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, 
d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, 
s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), 
s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case other => data(i) = row.get(i, dataType)
    +          }
    +        } else {
    +          setNull(dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    override def close(): Unit = {
    +      recordWriter.close(context)
    +    }
    +  }
    +
    +  override def shortName(): String = "carbon"
    +
    +  override def toString: String = "carbon"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = 
other.isInstanceOf[SparkCarbonFileFormat]
    +
    +  /**
    +   * Whether to support vector reader while reading data.
    +   * In case of complex types it is not required to support it
    +   */
    +  private def supportVector(sparkSession: SparkSession, schema: 
StructType): Boolean = {
    +    val vectorizedReader = {
    +      if (sparkSession.sqlContext.sparkSession.conf
    +        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
    +        
sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else if 
(System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
    +        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else {
    +        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
    +          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
    +      }
    +    }
    +    vectorizedReader.toBoolean && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +
    +  /**
    +   * Returns whether this format support returning columnar batch or not.
    +   */
    +  override def supportBatch(sparkSession: SparkSession, schema: 
StructType): Boolean = {
    +    val conf = sparkSession.sessionState.conf
    +    conf.wholeStageEnabled &&
    +    schema.length <= conf.wholeStageMaxNumFields &&
    +    schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +  /**
    +   * Returns a function that can be used to read a single carbondata file 
in as an
    +   * Iterator of InternalRow.
    +   */
    +  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
    +      dataSchema: StructType,
    +      partitionSchema: StructType,
    +      requiredSchema: StructType,
    +      filters: Seq[Filter],
    +      options: Map[String, String],
    +      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] 
= {
    +    val filter: Option[CarbonExpression] = filters.flatMap { filter =>
    +      CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
    +    }.reduceOption(new AndExpression(_, _))
    +
    +    val projection = requiredSchema.map(_.name).toArray
    +    val carbonProjection = new CarbonProjection
    +    projection.foreach(carbonProjection.addColumn)
    +
    +    var supportBatchValue: Boolean = false
    +
    +    val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
    +    val readVector = supportVector(sparkSession, resultSchema)
    +    if (readVector) {
    +      supportBatchValue = supportBatch(sparkSession, resultSchema)
    +    }
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, 
dataSchema)
    +    CarbonInputFormat
    +      .setTableInfo(hadoopConf, 
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
    +    CarbonInputFormat.setTransactionalTable(hadoopConf, false)
    +    CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
    +    filter match {
    +      case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
    +      case None => None
    +    }
    +    val format: CarbonFileInputFormat[Object] = new 
CarbonFileInputFormat[Object]
    +    val broadcastedHadoopConf =
    +      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
    +
    +    file: PartitionedFile => {
    +      assert(file.partitionValues.numFields == partitionSchema.size)
    +
    +      if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
    --- End diff --
    
    Can we use just check with carbondata extn?


---

Reply via email to