Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1729#discussion_r159204374
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
    @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
         // converted to hive standard fomat to let spark understand the data 
to partition.
         val serializationNullFormat =
           
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
    -    val failAction =
    -      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
    -        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
    -    val ignoreAction =
    -      
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
    -    val query: LogicalPlan = if (dataFrame.isDefined) {
    -      var timeStampformatString = 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    -      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    -      var dateFormatString = 
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    -      val dateFormat = new SimpleDateFormat(dateFormatString)
    -      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    -      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    -      val serializationNullFormat =
    -      
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
    -      val attributes =
    -        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
    -      val len = attributes.length
    -      val rdd = dataFrame.get.rdd.map { f =>
    -        val data = new Array[Any](len)
    -        var i = 0
    -        while (i < len) {
    -          data(i) =
    -            UTF8String.fromString(
    -              CarbonScalaUtil.getString(f.get(i),
    -                serializationNullFormat,
    -                delimiterLevel1,
    -                delimiterLevel2,
    -                timeStampFormat,
    -                dateFormat))
    -          i = i + 1
    +    val badRecordAction =
    +      carbonLoadModel.getBadRecordsAction.split(",")(1)
    +    var timeStampformatString = carbonLoadModel.getTimestampformat
    +    if (timeStampformatString.isEmpty) {
    +      timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +    }
    +    val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +    var dateFormatString = carbonLoadModel.getDateFormat
    +    if (dateFormatString.isEmpty) {
    +      dateFormatString = carbonLoadModel.getDefaultDateFormat
    +    }
    +    val dateFormat = new SimpleDateFormat(dateFormatString)
    +    
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, 
dateFormatString)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
    +      timeStampformatString)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
    +      serializationNullFormat)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
    +      badRecordAction)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
    +      carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
    +    try {
    +      val query: LogicalPlan = if (dataFrame.isDefined) {
    +        val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +        val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +        val attributes =
    +          StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
    +        val len = attributes.length
    +        val rdd = dataFrame.get.rdd.map { f =>
    +          val data = new Array[Any](len)
    +          var i = 0
    +          while (i < len) {
    +            data(i) =
    +              UTF8String.fromString(
    +                CarbonScalaUtil.getString(f.get(i),
    +                  serializationNullFormat,
    +                  delimiterLevel1,
    +                  delimiterLevel2,
    +                  timeStampFormat,
    +                  dateFormat))
    +            i = i + 1
    +          }
    +          InternalRow.fromSeq(data)
             }
    -        InternalRow.fromSeq(data)
    -      }
    -      if (updateModel.isDefined) {
    -        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
    -        // In case of update, we don't need the segmrntid column in case 
of partitioning
    -        val dropAttributes = attributes.dropRight(1)
    -        val finalOutput = catalogTable.schema.map { attr =>
    -          dropAttributes.find { d =>
    -            val index = d.name.lastIndexOf("-updatedColumn")
    -            if (index > 0) {
    -              d.name.substring(0, index).equalsIgnoreCase(attr.name)
    -            } else {
    -              d.name.equalsIgnoreCase(attr.name)
    -            }
    -          }.get
    +        if (updateModel.isDefined) {
    +          sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, 
null)
    +          // In case of update, we don't need the segmrntid column in case 
of partitioning
    +          val dropAttributes = attributes.dropRight(1)
    +          val finalOutput = catalogTable.schema.map { attr =>
    +            dropAttributes.find { d =>
    +              val index = d.name.lastIndexOf("-updatedColumn")
    +              if (index > 0) {
    +                d.name.substring(0, index).equalsIgnoreCase(attr.name)
    +              } else {
    +                d.name.equalsIgnoreCase(attr.name)
    +              }
    +            }.get
    +          }
    +          Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
    +        } else {
    +          LogicalRDD(attributes, rdd)(sparkSession)
             }
    -        Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
    -      } else {
    -        LogicalRDD(attributes, rdd)(sparkSession)
    -      }
     
    -    } else {
    -      var timeStampformatString = carbonLoadModel.getTimestampformat
    -      if (timeStampformatString.isEmpty) {
    -        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    -      }
    -      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    -      var dateFormatString = carbonLoadModel.getDateFormat
    -      if (dateFormatString.isEmpty) {
    -        dateFormatString = carbonLoadModel.getDefaultDateFormat
    -      }
    -      val dateFormat = new SimpleDateFormat(dateFormatString)
    -      // input data from csv files. Convert to logical plan
    -      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    -      hadoopConf.set(FileInputFormat.INPUT_DIR, 
carbonLoadModel.getFactFilePath)
    -      val jobConf = new JobConf(hadoopConf)
    -      SparkHadoopUtil.get.addCredentials(jobConf)
    -      val attributes =
    -        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    -          StructField(_, StringType))).toAttributes
    -      val rowDataTypes = attributes.map { attribute =>
    -        catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) 
match {
    -          case Some(attr) => attr.dataType
    -          case _ => StringType
    +      } else {
    +        // input data from csv files. Convert to logical plan
    +        CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +        hadoopConf.set(FileInputFormat.INPUT_DIR, 
carbonLoadModel.getFactFilePath)
    +        val jobConf = new JobConf(hadoopConf)
    +        SparkHadoopUtil.get.addCredentials(jobConf)
    +        val attributes =
    +          StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +            StructField(_, StringType))).toAttributes
    +        val rowDataTypes = attributes.map { attribute =>
    +          
catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
    +            case Some(attr) => attr.dataType
    +            case _ => StringType
    +          }
             }
    -      }
    -      val len = rowDataTypes.length
    -      // Fail row conversion if fail/ignore badrecord action is enabled
    -      val fail = failAction || ignoreAction
    -      var rdd =
    -        new NewHadoopRDD[NullWritable, StringArrayWritable](
    -          sparkSession.sparkContext,
    -          classOf[CSVInputFormat],
    -          classOf[NullWritable],
    -          classOf[StringArrayWritable],
    -          jobConf).map{ case (key, value) =>
    +        val len = rowDataTypes.length
    +        var rdd =
    +          new NewHadoopRDD[NullWritable, StringArrayWritable](
    +            sparkSession.sparkContext,
    +            classOf[CSVInputFormat],
    +            classOf[NullWritable],
    +            classOf[StringArrayWritable],
    +            jobConf).map { case (key, value) =>
                 val data = new Array[Any](len)
                 var i = 0
                 val input = value.get()
                 val inputLen = Math.min(input.length, len)
    -            try {
    -              while (i < inputLen) {
    -                // TODO find a way to avoid double conversion of date and 
time.
    -                data(i) = CarbonScalaUtil.convertToUTF8String(
    -                  input(i),
    -                  rowDataTypes(i),
    -                  timeStampFormat,
    -                  dateFormat,
    -                  serializationNullFormat,
    -                  fail)
    -                i = i + 1
    -              }
    -              InternalRow.fromSeq(data)
    -            } catch {
    -              case e: Exception =>
    -                if (failAction) {
    -                  // It is badrecord fail case.
    -                  throw new BadRecordFoundException(
    -                    s"Data load failed due to bad record: " +
    -                    s"${input(i)} with datatype ${rowDataTypes(i)}")
    -                } else {
    -                  // It is bad record ignore case
    -                  InternalRow.empty
    -                }
    +            while (i < inputLen) {
    +              // TODO find a way to avoid double conversion of date and 
time.
    +              data(i) = UTF8String.fromString(input(i))
    +              i = i + 1
                 }
    +            InternalRow.fromSeq(data)
     
    +          }
    +        // Only select the required columns
    +        val output = if (partition.nonEmpty) {
    +          catalogTable.schema.map { attr =>
    +            attributes.find(_.name.equalsIgnoreCase(attr.name)).get
    +          }.filter(attr => partition.get(attr.name).isEmpty)
    +        } else {
    +          catalogTable.schema.map(f => 
attributes.find(_.name.equalsIgnoreCase(f.name)).get)
             }
    -      // In bad record ignore case filter the empty values
    -      if (ignoreAction) {
    -        rdd = rdd.filter(f => f.numFields != 0)
    +        Project(output, LogicalRDD(attributes, rdd)(sparkSession))
           }
    -
    -      // Only select the required columns
    -      val output = if (partition.nonEmpty) {
    -        catalogTable.schema.map{ attr =>
    -          attributes.find(_.name.equalsIgnoreCase(attr.name)).get
    -        }.filter(attr => partition.get(attr.name).isEmpty)
    +      // TODO need to find a way to avoid double lookup
    +      val sizeInBytes =
    +        CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
    +          
catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
    +      val catalog = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
    +      val convertRelation = convertToLogicalRelation(
    +        catalogTable,
    +        sizeInBytes,
    +        isOverwriteTable,
    +        carbonLoadModel,
    +        sparkSession)
    +      val convertedPlan =
    +        CarbonReflectionUtils.getInsertIntoCommand(
    +          convertRelation,
    +          partition,
    +          query,
    +          false,
    --- End diff --
    
    ok


---

Reply via email to