[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user asfgit closed the pull request at: https://github.com/apache/carbondata/pull/1729 ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159468842 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala --- @@ -156,54 +158,156 @@ object CarbonScalaUtil { } /** - * Converts incoming value to UTF8String after converting data as per the data type. + * Converts incoming value to String after converting data as per the data type. * @param value Input value to convert - * @param dataType Datatype to convert and then convert to UTF8String + * @param dataType Datatype to convert and then convert to String * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes - * @param dateFormat DataFormat to convert incase of DateType datatype + * @param dateFormat DataFormat to convert in case of DateType datatype * @param serializationNullFormat if this encounters in input data then data will *be treated as null - * @param fail If it is true then any conversion error will trhow error otherwise it will be - * filled with ull value - * @return converted UTF8String + * @return converted String */ - def convertToUTF8String(value: String, + def convertToString(value: String, --- End diff -- ok ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159468830 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -135,6 +136,25 @@ class CarbonSessionCatalog( sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } + override def createPartitions(tableName: TableIdentifier, --- End diff -- ok ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159468640 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -142,6 +143,25 @@ class CarbonSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } + override def createPartitions(tableName: TableIdentifier, --- End diff -- ok ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159468449 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java --- @@ -99,6 +99,15 @@ public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS = "carbon.options.global.sort.partitions"; + /** + * specify serialization null format --- End diff -- ok ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159369856 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala --- @@ -156,54 +158,156 @@ object CarbonScalaUtil { } /** - * Converts incoming value to UTF8String after converting data as per the data type. + * Converts incoming value to String after converting data as per the data type. * @param value Input value to convert - * @param dataType Datatype to convert and then convert to UTF8String + * @param dataType Datatype to convert and then convert to String * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes - * @param dateFormat DataFormat to convert incase of DateType datatype + * @param dateFormat DataFormat to convert in case of DateType datatype * @param serializationNullFormat if this encounters in input data then data will *be treated as null - * @param fail If it is true then any conversion error will trhow error otherwise it will be - * filled with ull value - * @return converted UTF8String + * @return converted String */ - def convertToUTF8String(value: String, + def convertToString(value: String, --- End diff -- move parameter to next line ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159369485 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -135,6 +136,25 @@ class CarbonSessionCatalog( sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } + override def createPartitions(tableName: TableIdentifier, --- End diff -- move tableName to next line ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159369458 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -142,6 +143,25 @@ class CarbonSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } + override def createPartitions(tableName: TableIdentifier, --- End diff -- move tableName to next line ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159369206 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java --- @@ -99,6 +99,15 @@ public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS = "carbon.options.global.sort.partitions"; + /** + * specify serialization null format --- End diff -- Please describe what is the purpose of this property ---
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159204374 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) -val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( -CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) -val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") -val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = -StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => -val data = new Array[Any](len) -var i = 0 -while (i < len) { - data(i) = -UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), -serializationNullFormat, -delimiterLevel1, -delimiterLevel2, -timeStampFormat, -dateFormat)) - i = i + 1 +val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) +var timeStampformatString = carbonLoadModel.getTimestampformat +if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat +} +val timeStampFormat = new SimpleDateFormat(timeStampformatString) +var dateFormatString = carbonLoadModel.getDateFormat +if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat +} +val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) +try { + val query: LogicalPlan = if (dataFrame.isDefined) { +val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 +val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 +val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes +val len = attributes.length +val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { +data(i) = + UTF8String.fromString( +CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) +i = i + 1 + } + InternalRow.fromSeq(data) } -InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { -sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) -// In case of update, we don't need the segmrntid column in case of partitioning -val dropAttributes = attributes.dropRight(1) -val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => -val index = d.name.lastIndexOf("-updatedColumn")
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r159204180 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) -val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( -CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) -val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") -val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = -StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => -val data = new Array[Any](len) -var i = 0 -while (i < len) { - data(i) = -UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), -serializationNullFormat, -delimiterLevel1, -delimiterLevel2, -timeStampFormat, -dateFormat)) - i = i + 1 +val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) +var timeStampformatString = carbonLoadModel.getTimestampformat +if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat +} +val timeStampFormat = new SimpleDateFormat(timeStampformatString) +var dateFormatString = carbonLoadModel.getDateFormat +if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat +} +val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) +try { + val query: LogicalPlan = if (dataFrame.isDefined) { +val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 +val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 +val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes +val len = attributes.length +val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { +data(i) = + UTF8String.fromString( +CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) +i = i + 1 + } + InternalRow.fromSeq(data) } -InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { -sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) -// In case of update, we don't need the segmrntid column in case of partitioning -val dropAttributes = attributes.dropRight(1) -val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => -val index = d.name.lastIndexOf("-updatedColumn")
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r158954662 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) -val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( -CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) -val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") -val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = -StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => -val data = new Array[Any](len) -var i = 0 -while (i < len) { - data(i) = -UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), -serializationNullFormat, -delimiterLevel1, -delimiterLevel2, -timeStampFormat, -dateFormat)) - i = i + 1 +val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) +var timeStampformatString = carbonLoadModel.getTimestampformat +if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat +} +val timeStampFormat = new SimpleDateFormat(timeStampformatString) +var dateFormatString = carbonLoadModel.getDateFormat +if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat +} +val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) +try { + val query: LogicalPlan = if (dataFrame.isDefined) { +val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 +val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 +val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes +val len = attributes.length +val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { +data(i) = + UTF8String.fromString( +CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) +i = i + 1 + } + InternalRow.fromSeq(data) } -InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { -sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) -// In case of update, we don't need the segmrntid column in case of partitioning -val dropAttributes = attributes.dropRight(1) -val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => -val index = d.name.lastIndexOf("-updatedColumn")
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1729#discussion_r158954562 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) -val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( -CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) -val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") -val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = -StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => -val data = new Array[Any](len) -var i = 0 -while (i < len) { - data(i) = -UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), -serializationNullFormat, -delimiterLevel1, -delimiterLevel2, -timeStampFormat, -dateFormat)) - i = i + 1 +val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) +var timeStampformatString = carbonLoadModel.getTimestampformat +if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat +} +val timeStampFormat = new SimpleDateFormat(timeStampformatString) +var dateFormatString = carbonLoadModel.getDateFormat +if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat +} +val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) +CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) +try { + val query: LogicalPlan = if (dataFrame.isDefined) { +val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 +val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 +val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes +val len = attributes.length +val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { +data(i) = + UTF8String.fromString( +CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) +i = i + 1 + } + InternalRow.fromSeq(data) } -InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { -sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) -// In case of update, we don't need the segmrntid column in case of partitioning -val dropAttributes = attributes.dropRight(1) -val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => -val index = d.name.lastIndexOf("-updatedColumn")