Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159204180 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { --- End diff -- It is hard to split as the update scenario needs very small code need to be added in between the method to update the query plan, If we try to implement 2 different methods then it will be lot of duplicate code needs to be added. So I have separated the update code to a new method for better readability.
---