Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159204374 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + // In case of update, we don't need the segmrntid column in case of partitioning + val dropAttributes = attributes.dropRight(1) + val finalOutput = catalogTable.schema.map { attr => + dropAttributes.find { d => + val index = d.name.lastIndexOf("-updatedColumn") + if (index > 0) { + d.name.substring(0, index).equalsIgnoreCase(attr.name) + } else { + d.name.equalsIgnoreCase(attr.name) + } + }.get + } + Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) + } else { + LogicalRDD(attributes, rdd)(sparkSession) } - Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) - } else { - LogicalRDD(attributes, rdd)(sparkSession) - } - } else { - var timeStampformatString = carbonLoadModel.getTimestampformat - if (timeStampformatString.isEmpty) { - timeStampformatString = carbonLoadModel.getDefaultTimestampFormat - } - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = carbonLoadModel.getDateFormat - if (dateFormatString.isEmpty) { - dateFormatString = carbonLoadModel.getDefaultDateFormat - } - val dateFormat = new SimpleDateFormat(dateFormatString) - // input data from csv files. Convert to logical plan - CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) - hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - val attributes = - StructType(carbonLoadModel.getCsvHeaderColumns.map( - StructField(_, StringType))).toAttributes - val rowDataTypes = attributes.map { attribute => - catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { - case Some(attr) => attr.dataType - case _ => StringType + } else { + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map { attribute => + catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } } - } - val len = rowDataTypes.length - // Fail row conversion if fail/ignore badrecord action is enabled - val fail = failAction || ignoreAction - var rdd = - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf).map{ case (key, value) => + val len = rowDataTypes.length + var rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map { case (key, value) => val data = new Array[Any](len) var i = 0 val input = value.get() val inputLen = Math.min(input.length, len) - try { - while (i < inputLen) { - // TODO find a way to avoid double conversion of date and time. - data(i) = CarbonScalaUtil.convertToUTF8String( - input(i), - rowDataTypes(i), - timeStampFormat, - dateFormat, - serializationNullFormat, - fail) - i = i + 1 - } - InternalRow.fromSeq(data) - } catch { - case e: Exception => - if (failAction) { - // It is badrecord fail case. - throw new BadRecordFoundException( - s"Data load failed due to bad record: " + - s"${input(i)} with datatype ${rowDataTypes(i)}") - } else { - // It is bad record ignore case - InternalRow.empty - } + while (i < inputLen) { + // TODO find a way to avoid double conversion of date and time. + data(i) = UTF8String.fromString(input(i)) + i = i + 1 } + InternalRow.fromSeq(data) + } + // Only select the required columns + val output = if (partition.nonEmpty) { + catalogTable.schema.map { attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => partition.get(attr.name).isEmpty) + } else { + catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) } - // In bad record ignore case filter the empty values - if (ignoreAction) { - rdd = rdd.filter(f => f.numFields != 0) + Project(output, LogicalRDD(attributes, rdd)(sparkSession)) } - - // Only select the required columns - val output = if (partition.nonEmpty) { - catalogTable.schema.map{ attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => partition.get(attr.name).isEmpty) + // TODO need to find a way to avoid double lookup + val sizeInBytes = + CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( + catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes + val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) + val convertRelation = convertToLogicalRelation( + catalogTable, + sizeInBytes, + isOverwriteTable, + carbonLoadModel, + sparkSession) + val convertedPlan = + CarbonReflectionUtils.getInsertIntoCommand( + convertRelation, + partition, + query, + false, --- End diff -- ok
---