[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/1729


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-03 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159468842
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 ---
@@ -156,54 +158,156 @@ object CarbonScalaUtil {
   }
 
   /**
-   * Converts incoming value to UTF8String after converting data as per 
the data type.
+   * Converts incoming value to String after converting data as per the 
data type.
* @param value Input value to convert
-   * @param dataType Datatype to convert and then convert to UTF8String
+   * @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of 
timestamp datatypes
-   * @param dateFormat DataFormat to convert incase of DateType datatype
+   * @param dateFormat DataFormat to convert in case of DateType datatype
* @param serializationNullFormat if this encounters in input data then 
data will
*be treated as null
-   * @param fail If it is true then any conversion error will trhow error 
otherwise it will be
-   *   filled with ull value
-   * @return converted UTF8String
+   * @return converted String
*/
-  def convertToUTF8String(value: String,
+  def convertToString(value: String,
--- End diff --

ok


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-03 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159468830
  
--- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
@@ -135,6 +136,25 @@ class CarbonSessionCatalog(
 sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
 
+  override def createPartitions(tableName: TableIdentifier,
--- End diff --

ok


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-03 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159468640
  
--- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
@@ -142,6 +143,25 @@ class CarbonSessionCatalog(
   .asInstanceOf[HiveExternalCatalog].client
   }
 
+  override def createPartitions(tableName: TableIdentifier,
--- End diff --

ok


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-03 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159468449
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 ---
@@ -99,6 +99,15 @@
   public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS =
   "carbon.options.global.sort.partitions";
 
+  /**
+   * specify serialization null format
--- End diff --

ok


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159369856
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 ---
@@ -156,54 +158,156 @@ object CarbonScalaUtil {
   }
 
   /**
-   * Converts incoming value to UTF8String after converting data as per 
the data type.
+   * Converts incoming value to String after converting data as per the 
data type.
* @param value Input value to convert
-   * @param dataType Datatype to convert and then convert to UTF8String
+   * @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of 
timestamp datatypes
-   * @param dateFormat DataFormat to convert incase of DateType datatype
+   * @param dateFormat DataFormat to convert in case of DateType datatype
* @param serializationNullFormat if this encounters in input data then 
data will
*be treated as null
-   * @param fail If it is true then any conversion error will trhow error 
otherwise it will be
-   *   filled with ull value
-   * @return converted UTF8String
+   * @return converted String
*/
-  def convertToUTF8String(value: String,
+  def convertToString(value: String,
--- End diff --

move parameter to next line


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159369485
  
--- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
@@ -135,6 +136,25 @@ class CarbonSessionCatalog(
 sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
 
+  override def createPartitions(tableName: TableIdentifier,
--- End diff --

move tableName to next line


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159369458
  
--- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
@@ -142,6 +143,25 @@ class CarbonSessionCatalog(
   .asInstanceOf[HiveExternalCatalog].client
   }
 
+  override def createPartitions(tableName: TableIdentifier,
--- End diff --

move tableName to next line


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159369206
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 ---
@@ -99,6 +99,15 @@
   public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS =
   "carbon.options.global.sort.partitions";
 
+  /**
+   * specify serialization null format
--- End diff --

Please describe what is the purpose of this property


---


[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159204374
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
 // converted to hive standard fomat to let spark understand the data 
to partition.
 val serializationNullFormat =
   
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-val failAction =
-  carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
-CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-val ignoreAction =
-  
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
-val query: LogicalPlan = if (dataFrame.isDefined) {
-  var timeStampformatString = 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-  val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  var dateFormatString = 
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
-  
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-  val attributes =
-StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
-  val len = attributes.length
-  val rdd = dataFrame.get.rdd.map { f =>
-val data = new Array[Any](len)
-var i = 0
-while (i < len) {
-  data(i) =
-UTF8String.fromString(
-  CarbonScalaUtil.getString(f.get(i),
-serializationNullFormat,
-delimiterLevel1,
-delimiterLevel2,
-timeStampFormat,
-dateFormat))
-  i = i + 1
+val badRecordAction =
+  carbonLoadModel.getBadRecordsAction.split(",")(1)
+var timeStampformatString = carbonLoadModel.getTimestampformat
+if (timeStampformatString.isEmpty) {
+  timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+}
+val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+var dateFormatString = carbonLoadModel.getDateFormat
+if (dateFormatString.isEmpty) {
+  dateFormatString = carbonLoadModel.getDefaultDateFormat
+}
+val dateFormat = new SimpleDateFormat(dateFormatString)
+
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, 
dateFormatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+  timeStampformatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+  serializationNullFormat)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+  badRecordAction)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+  carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+try {
+  val query: LogicalPlan = if (dataFrame.isDefined) {
+val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+val attributes =
+  StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
+val len = attributes.length
+val rdd = dataFrame.get.rdd.map { f =>
+  val data = new Array[Any](len)
+  var i = 0
+  while (i < len) {
+data(i) =
+  UTF8String.fromString(
+CarbonScalaUtil.getString(f.get(i),
+  serializationNullFormat,
+  delimiterLevel1,
+  delimiterLevel2,
+  timeStampFormat,
+  dateFormat))
+i = i + 1
+  }
+  InternalRow.fromSeq(data)
 }
-InternalRow.fromSeq(data)
-  }
-  if (updateModel.isDefined) {
-sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-// In case of update, we don't need the segmrntid column in case 
of partitioning
-val dropAttributes = attributes.dropRight(1)
-val finalOutput = catalogTable.schema.map { attr =>
-  dropAttributes.find { d =>
-val index = d.name.lastIndexOf("-updatedColumn")
  

[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2018-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r159204180
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
 // converted to hive standard fomat to let spark understand the data 
to partition.
 val serializationNullFormat =
   
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-val failAction =
-  carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
-CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-val ignoreAction =
-  
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
-val query: LogicalPlan = if (dataFrame.isDefined) {
-  var timeStampformatString = 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-  val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  var dateFormatString = 
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
-  
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-  val attributes =
-StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
-  val len = attributes.length
-  val rdd = dataFrame.get.rdd.map { f =>
-val data = new Array[Any](len)
-var i = 0
-while (i < len) {
-  data(i) =
-UTF8String.fromString(
-  CarbonScalaUtil.getString(f.get(i),
-serializationNullFormat,
-delimiterLevel1,
-delimiterLevel2,
-timeStampFormat,
-dateFormat))
-  i = i + 1
+val badRecordAction =
+  carbonLoadModel.getBadRecordsAction.split(",")(1)
+var timeStampformatString = carbonLoadModel.getTimestampformat
+if (timeStampformatString.isEmpty) {
+  timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+}
+val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+var dateFormatString = carbonLoadModel.getDateFormat
+if (dateFormatString.isEmpty) {
+  dateFormatString = carbonLoadModel.getDefaultDateFormat
+}
+val dateFormat = new SimpleDateFormat(dateFormatString)
+
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, 
dateFormatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+  timeStampformatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+  serializationNullFormat)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+  badRecordAction)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+  carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+try {
+  val query: LogicalPlan = if (dataFrame.isDefined) {
+val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+val attributes =
+  StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
+val len = attributes.length
+val rdd = dataFrame.get.rdd.map { f =>
+  val data = new Array[Any](len)
+  var i = 0
+  while (i < len) {
+data(i) =
+  UTF8String.fromString(
+CarbonScalaUtil.getString(f.get(i),
+  serializationNullFormat,
+  delimiterLevel1,
+  delimiterLevel2,
+  timeStampFormat,
+  dateFormat))
+i = i + 1
+  }
+  InternalRow.fromSeq(data)
 }
-InternalRow.fromSeq(data)
-  }
-  if (updateModel.isDefined) {
-sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-// In case of update, we don't need the segmrntid column in case 
of partitioning
-val dropAttributes = attributes.dropRight(1)
-val finalOutput = catalogTable.schema.map { attr =>
-  dropAttributes.find { d =>
-val index = d.name.lastIndexOf("-updatedColumn")
  

[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2017-12-28 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r158954662
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
 // converted to hive standard fomat to let spark understand the data 
to partition.
 val serializationNullFormat =
   
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-val failAction =
-  carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
-CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-val ignoreAction =
-  
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
-val query: LogicalPlan = if (dataFrame.isDefined) {
-  var timeStampformatString = 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-  val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  var dateFormatString = 
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
-  
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-  val attributes =
-StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
-  val len = attributes.length
-  val rdd = dataFrame.get.rdd.map { f =>
-val data = new Array[Any](len)
-var i = 0
-while (i < len) {
-  data(i) =
-UTF8String.fromString(
-  CarbonScalaUtil.getString(f.get(i),
-serializationNullFormat,
-delimiterLevel1,
-delimiterLevel2,
-timeStampFormat,
-dateFormat))
-  i = i + 1
+val badRecordAction =
+  carbonLoadModel.getBadRecordsAction.split(",")(1)
+var timeStampformatString = carbonLoadModel.getTimestampformat
+if (timeStampformatString.isEmpty) {
+  timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+}
+val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+var dateFormatString = carbonLoadModel.getDateFormat
+if (dateFormatString.isEmpty) {
+  dateFormatString = carbonLoadModel.getDefaultDateFormat
+}
+val dateFormat = new SimpleDateFormat(dateFormatString)
+
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, 
dateFormatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+  timeStampformatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+  serializationNullFormat)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+  badRecordAction)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+  carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+try {
+  val query: LogicalPlan = if (dataFrame.isDefined) {
+val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+val attributes =
+  StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
+val len = attributes.length
+val rdd = dataFrame.get.rdd.map { f =>
+  val data = new Array[Any](len)
+  var i = 0
+  while (i < len) {
+data(i) =
+  UTF8String.fromString(
+CarbonScalaUtil.getString(f.get(i),
+  serializationNullFormat,
+  delimiterLevel1,
+  delimiterLevel2,
+  timeStampFormat,
+  dateFormat))
+i = i + 1
+  }
+  InternalRow.fromSeq(data)
 }
-InternalRow.fromSeq(data)
-  }
-  if (updateModel.isDefined) {
-sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-// In case of update, we don't need the segmrntid column in case 
of partitioning
-val dropAttributes = attributes.dropRight(1)
-val finalOutput = catalogTable.schema.map { attr =>
-  dropAttributes.find { d =>
-val index = d.name.lastIndexOf("-updatedColumn")

[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

2017-12-28 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1729#discussion_r158954562
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
 // converted to hive standard fomat to let spark understand the data 
to partition.
 val serializationNullFormat =
   
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-val failAction =
-  carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
-CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-val ignoreAction =
-  
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
-val query: LogicalPlan = if (dataFrame.isDefined) {
-  var timeStampformatString = 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-  val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  var dateFormatString = 
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
-  
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
-  val attributes =
-StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
-  val len = attributes.length
-  val rdd = dataFrame.get.rdd.map { f =>
-val data = new Array[Any](len)
-var i = 0
-while (i < len) {
-  data(i) =
-UTF8String.fromString(
-  CarbonScalaUtil.getString(f.get(i),
-serializationNullFormat,
-delimiterLevel1,
-delimiterLevel2,
-timeStampFormat,
-dateFormat))
-  i = i + 1
+val badRecordAction =
+  carbonLoadModel.getBadRecordsAction.split(",")(1)
+var timeStampformatString = carbonLoadModel.getTimestampformat
+if (timeStampformatString.isEmpty) {
+  timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+}
+val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+var dateFormatString = carbonLoadModel.getDateFormat
+if (dateFormatString.isEmpty) {
+  dateFormatString = carbonLoadModel.getDefaultDateFormat
+}
+val dateFormat = new SimpleDateFormat(dateFormatString)
+
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, 
dateFormatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+  timeStampformatString)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+  serializationNullFormat)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+  badRecordAction)
+CarbonSession.threadSet(
+  CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+  carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+try {
+  val query: LogicalPlan = if (dataFrame.isDefined) {
+val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+val attributes =
+  StructType(dataFrame.get.schema.fields.map(_.copy(dataType = 
StringType))).toAttributes
+val len = attributes.length
+val rdd = dataFrame.get.rdd.map { f =>
+  val data = new Array[Any](len)
+  var i = 0
+  while (i < len) {
+data(i) =
+  UTF8String.fromString(
+CarbonScalaUtil.getString(f.get(i),
+  serializationNullFormat,
+  delimiterLevel1,
+  delimiterLevel2,
+  timeStampFormat,
+  dateFormat))
+i = i + 1
+  }
+  InternalRow.fromSeq(data)
 }
-InternalRow.fromSeq(data)
-  }
-  if (updateModel.isDefined) {
-sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-// In case of update, we don't need the segmrntid column in case 
of partitioning
-val dropAttributes = attributes.dropRight(1)
-val finalOutput = catalogTable.schema.map { attr =>
-  dropAttributes.find { d =>
-val index = d.name.lastIndexOf("-updatedColumn")