http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index 4806f9f..9ea58a9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.rdd +import java.util + import scala.collection.JavaConverters._ import org.apache.spark.{Partition, SparkContext, TaskContext} @@ -59,10 +61,12 @@ class CarbonDropPartitionRDD( val iter = new Iterator[String] { val split = theSplit.asInstanceOf[CarbonDropPartition] logInfo("Dropping partition information from : " + split.segmentPath) - + partitions.toList.asJava + val partitionList = new util.ArrayList[util.List[String]]() + partitionList.add(partitions.toList.asJava) new PartitionMapFileStore().dropPartitions( split.segmentPath, - partitions.toList.asJava, + partitionList, uniqueId, partialMatch)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 748945d..73be3c8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.DataTypeInfo import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField} +import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.util.{ByteUtil, CarbonSessionInfo} -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema} +import org.apache.carbondata.core.util.{CarbonSessionInfo, DataTypeUtil} object CarbonScalaUtil { def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { @@ -196,21 +200,85 @@ object CarbonScalaUtil { /** * Converts incoming value to String after converting data as per the data type. * @param value Input value to convert - * @param dataType Datatype to convert and then convert to String - * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes - * @param dateFormat DataFormat to convert in case of DateType datatype + * @param column column which it value belongs to * @return converted String */ - def convertToCarbonFormat(value: String, - dataType: DataType, - timeStampFormat: SimpleDateFormat, - dateFormat: SimpleDateFormat): String = { + def convertToCarbonFormat( + value: String, + column: CarbonColumn, + forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary], + table: CarbonTable): String = { + if (column.hasEncoding(Encoding.DICTIONARY)) { + if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) { + val time = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( + column.getDataType, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + ).getValueFromSurrogate(value.toInt).toString + return DateTimeUtils.timestampToString(time.toLong * 1000) + } else if (column.getDataType.equals(CarbonDataTypes.DATE)) { + val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( + column.getDataType, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT + ).getValueFromSurrogate(value.toInt).toString + return DateTimeUtils.dateToString(date.toInt) + } + } + val dictionaryPath = + table.getTableInfo.getFactTable.getTableProperties.get( + CarbonCommonConstants.DICTIONARY_PATH) + val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier( + table.getAbsoluteTableIdentifier, + column.getColumnIdentifier, column.getDataType, + dictionaryPath) + return forwardDictionaryCache.get( + dictionaryColumnUniqueIdentifier).getDictionaryValueForKey(value.toInt) + } try { - dataType match { - case TimestampType => - timeStampFormat.format(DateTimeUtils.stringToTime(value)) - case DateType => - dateFormat.format(DateTimeUtils.stringToTime(value)) + column.getDataType match { + case CarbonDataTypes.TIMESTAMP => + DateTimeUtils.timestampToString(value.toLong * 1000) + case CarbonDataTypes.DATE => + DateTimeUtils.dateToString(DateTimeUtils.millisToDays(value.toLong)) + case _ => value + } + } catch { + case e: Exception => + value + } + } + + /** + * Converts incoming value to String after converting data as per the data type. + * @param value Input value to convert + * @param column column which it value belongs to + * @return converted String + */ + def convertStaticPartitions( + value: String, + column: ColumnSchema, + table: CarbonTable): String = { + try { + if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) { + return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( + column.getDataType, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + ).generateDirectSurrogateKey(value).toString + } else if (column.getDataType.equals(CarbonDataTypes.DATE)) { + return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( + column.getDataType, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT + ).generateDirectSurrogateKey(value).toString + } + } + column.getDataType match { + case CarbonDataTypes.TIMESTAMP => + DataTypeUtil.getDataDataTypeForNoDictionaryColumn(value, + column.getDataType, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT).toString + case CarbonDataTypes.DATE => + DateTimeUtils.stringToDate(UTF8String.fromString(value)).get.toString case _ => value } } catch { @@ -229,11 +297,11 @@ object CarbonScalaUtil { partitionSpec: Map[String, String], table: CarbonTable, timeFormat: SimpleDateFormat, - dateFormat: SimpleDateFormat, - serializationNullFormat: String, - badRecordAction: String, - isEmptyBadRecord: Boolean): Map[String, String] = { + dateFormat: SimpleDateFormat): Map[String, String] = { val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__" + val cacheProvider: CacheProvider = CacheProvider.getInstance + val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) partitionSpec.map{ case (col, pvalue) => // replace special string with empty value. val value = if (pvalue == null) { @@ -246,17 +314,15 @@ object CarbonScalaUtil { val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase) val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType) try { - if (isEmptyBadRecord && value.length == 0 && - badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) && - dataType != StringType) { - (col, hiveignorepartition) - } else if (!isEmptyBadRecord && value.length == 0 && dataType != StringType) { - (col, hivedefaultpartition) - } else if (value.equals(hivedefaultpartition)) { + if (value.equals(hivedefaultpartition)) { (col, value) } else { - val convertedString = CarbonScalaUtil.convertToString( - value, dataType, timeFormat, dateFormat, serializationNullFormat) + val convertedString = + CarbonScalaUtil.convertToCarbonFormat( + value, + carbonColumn, + forwardDictionaryCache, + table) if (convertedString == null) { (col, hivedefaultpartition) } else { @@ -265,13 +331,7 @@ object CarbonScalaUtil { } } catch { case e: Exception => - // If it is bad record ignore case then add with special string so that it will be - // filtered after this. - if (badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString)) { - (col, hiveignorepartition) - } else { - (col, hivedefaultpartition) - } + (col, value) } } } @@ -306,10 +366,7 @@ object CarbonScalaUtil { f.spec, table, timeFormat, - dateFormat, - serializeFormat, - badRecordAction, - isEmptyBadRecord) + dateFormat) f.copy(spec = changedSpec) }.filterNot{ p => // Filter the special bad record ignore case string http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 7d49c11..9bdaddb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -23,6 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration @@ -32,15 +33,15 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} @@ -55,7 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException @@ -65,12 +66,14 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.processing.loading.sort.SortScopeOptions +import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -505,7 +508,7 @@ case class CarbonLoadDataCommand( carbonLoadModel: CarbonLoadModel, hadoopConf: Configuration, dataFrame: Option[DataFrame], - operationContext: OperationContext) = { + operationContext: OperationContext): Unit = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get @@ -544,17 +547,76 @@ case class CarbonLoadDataCommand( CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, isEmptyBadRecord) CarbonSession.threadSet("partition.operationcontext", operationContext) + // input data from csv files. Convert to logical plan + val allCols = new ArrayBuffer[String]() + allCols ++= table.getAllDimensions.asScala.map(_.getColName) + allCols ++= table.getAllMeasures.asScala.map(_.getColName) + var attributes = + StructType(allCols.map(StructField(_, StringType))).toAttributes + + var partitionsLen = 0 + val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = { + val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame) + val catalogAttributes = catalogTable.schema.toAttributes + attributes = attributes.map(a => { + catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get + }) + attributes = attributes.map { attr => + val column = table.getColumnByName(table.getTableName, attr.name) + if (column.hasEncoding(Encoding.DICTIONARY)) { + AttributeReference( + attr.name, + IntegerType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + } else if (attr.dataType == TimestampType || attr.dataType == DateType) { + AttributeReference( + attr.name, + LongType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + } else { + attr + } + } + // Only select the required columns + val output = if (partition.nonEmpty) { + val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) } + catalogTable.schema.map { attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) + } else { + catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) + } + partitionsLen = rdd.partitions.length + val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)) + if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) { + val sortColumns = table.getSortColumns(table.getTableName) + Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)), + true, + child) + } else { + child + } + } + try { val query: LogicalPlan = if (dataFrame.isDefined) { val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val attributes = + val dfAttributes = StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length + val partitionValues = if (partition.nonEmpty) { + partition.values.filter(_.nonEmpty).map(_.get).toArray + } else { + Array[String]() + } + val len = dfAttributes.length val rdd = dataFrame.get.rdd.map { f => val data = new Array[Any](len) var i = 0 - while (i < len) { + while (i < f.length) { data(i) = UTF8String.fromString( CarbonScalaUtil.getString(f.get(i), @@ -565,20 +627,32 @@ case class CarbonLoadDataCommand( dateFormat)) i = i + 1 } - InternalRow.fromSeq(data) + if (partitionValues.length > 0) { + var j = 0 + while (i < len) { + data(i) = UTF8String.fromString(partitionValues(j)) + j = j + 1 + i = i + 1 + } + } + Row.fromSeq(data) } - if (updateModel.isDefined) { + val transRdd = if (updateModel.isDefined) { // Get the updated query plan in case of update scenario - getLogicalQueryForUpdate(sparkSession, catalogTable, attributes, rdd) + Dataset.ofRows( + sparkSession, + getLogicalQueryForUpdate( + sparkSession, + catalogTable, + dfAttributes, + rdd.map(row => InternalRow.fromSeq(row.toSeq)), + carbonLoadModel)).rdd } else { - LogicalRDD(attributes, rdd)(sparkSession) + rdd } - + transformQuery(transRdd, true) } else { - // input data from csv files. Convert to logical plan - val attributes = - StructType(carbonLoadModel.getCsvHeaderColumns.map( - StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map { attribute => catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { case Some(attr) => attr.dataType @@ -592,41 +666,12 @@ case class CarbonLoadDataCommand( case _ => false } } - val len = rowDataTypes.length - var rdd = - DataLoadingUtil.csvFileScanRDD( - sparkSession, - model = carbonLoadModel, - hadoopConf) - .map { row => - val data = new Array[Any](len) - var i = 0 - val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]] - val inputLen = Math.min(input.length, len) - while (i < inputLen) { - data(i) = UTF8String.fromString(input(i)) - // If partition column then update empty value with special string otherwise spark - // makes it as null so we cannot internally handle badrecords. - if (partitionColumns(i)) { - if (input(i) != null && input(i).isEmpty) { - data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL) - } - } - i = i + 1 - } - InternalRow.fromSeq(data) - - } - // Only select the required columns - val output = if (partition.nonEmpty) { - val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)} - catalogTable.schema.map { attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) - } else { - catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) - } - Project(output, LogicalRDD(attributes, rdd)(sparkSession)) + val columnCount = carbonLoadModel.getCsvHeaderColumns.length + var rdd = DataLoadingUtil.csvFileScanRDD( + sparkSession, + model = carbonLoadModel, + hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) + transformQuery(rdd.asInstanceOf[RDD[Row]], false) } val convertRelation = convertToLogicalRelation( catalogTable, @@ -635,24 +680,29 @@ case class CarbonLoadDataCommand( carbonLoadModel, sparkSession, operationContext) + val logicalPlan = if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) { + var numPartitions = + CarbonDataProcessorUtil.getGlobalSortPartitions(carbonLoadModel.getGlobalSortPartitions) + if (numPartitions <= 0) { + numPartitions = partitionsLen + } + if (numPartitions > 0) { + Dataset.ofRows(sparkSession, query).repartition(numPartitions).logicalPlan + } else { + query + } + } else { + query + } + val convertedPlan = CarbonReflectionUtils.getInsertIntoCommand( table = convertRelation, partition = partition, - query = query, + query = logicalPlan, overwrite = false, ifPartitionNotExists = false) - if (isOverwriteTable && partition.nonEmpty) { - overwritePartition( - sparkSession, - table, - convertedPlan, - serializationNullFormat, - badRecordAction, - isEmptyBadRecord.toBoolean) - } else { - Dataset.ofRows(sparkSession, convertedPlan) - } + Dataset.ofRows(sparkSession, convertedPlan) } finally { CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT) @@ -660,6 +710,14 @@ case class CarbonLoadDataCommand( CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) CarbonSession.threadUnset("partition.operationcontext") + if (isOverwriteTable) { + DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) + // Clean the overwriting segments if any. + new PartitionMapFileStore().cleanSegments( + table, + CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava, + false) + } } try { // Trigger auto compaction @@ -676,6 +734,48 @@ case class CarbonLoadDataCommand( } } + private def convertData( + originRDD: RDD[Row], + sparkSession: SparkSession, + model: CarbonLoadModel, + isDataFrame: Boolean): RDD[InternalRow] = { + model.setPartitionId("0") + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + // 1. Input + var convertRDD = + if (isDataFrame) { + originRDD.mapPartitions{rows => + DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast) + } + } else { + originRDD.map{row => + val array = new Array[AnyRef](row.length) + var i = 0 + while (i < array.length) { + array(i) = row.get(i).asInstanceOf[AnyRef] + i = i + 1 + } + array + } + } + val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) => + DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) + DataLoadProcessorStepOnSpark.inputAndconvertFunc( + rows, + index, + modelBroadcast, + partialSuccessAccum, + inputStepRowCounter, + keepActualData = true) + }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData)) + + finalRDD + } + /** * Create the logical plan for update scenario. Here we should drop the segmentid column from the * input rdd. @@ -684,7 +784,8 @@ case class CarbonLoadDataCommand( sparkSession: SparkSession, catalogTable: CatalogTable, attributes: Seq[AttributeReference], - rdd: RDD[InternalRow]): LogicalPlan = { + rdd: RDD[InternalRow], + carbonLoadModel: CarbonLoadModel): LogicalPlan = { sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) // In case of update, we don't need the segmrntid column in case of partitioning val dropAttributes = attributes.dropRight(1) @@ -698,6 +799,8 @@ case class CarbonLoadDataCommand( } }.get } + carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(",")) + carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(",")) Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) } @@ -709,7 +812,16 @@ case class CarbonLoadDataCommand( sparkSession: SparkSession, operationContext: OperationContext): LogicalRelation = { val table = loadModel.getCarbonDataLoadSchema.getCarbonTable - val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType))) + val metastoreSchema = StructType(catalogTable.schema.fields.map{f => + val column = table.getColumnByName(table.getTableName, f.name) + if (column.hasEncoding(Encoding.DICTIONARY)) { + f.copy(dataType = IntegerType) + } else if (f.dataType == TimestampType || f.dataType == DateType) { + f.copy(dataType = LongType) + } else { + f + } + }) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val catalog = new CatalogFileIndex( sparkSession, catalogTable, sizeInBytes) @@ -718,20 +830,18 @@ case class CarbonLoadDataCommand( } else { catalog.filterPartitions(Nil) // materialize all the partitions in memory } - val partitionSchema = + var partitionSchema = StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field => metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get)) - val overWriteLocal = if (overWrite && partition.nonEmpty) { - false - } else { - overWrite - } val dataSchema = StructType(metastoreSchema - .filterNot(field => partitionSchema.contains(field.name))) + .filterNot(field => partitionSchema.contains(field))) + if (partition.nonEmpty) { + partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType))) + } val options = new mutable.HashMap[String, String]() options ++= catalogTable.storage.properties - options += (("overwrite", overWriteLocal.toString)) + options += (("overwrite", overWrite.toString)) options += (("onepass", loadModel.getUseOnePass.toString)) options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) @@ -761,108 +871,6 @@ case class CarbonLoadDataCommand( Some(catalogTable)) } - /** - * Overwrite the partition data if static partitions are specified. - * @param sparkSession - * @param table - * @param logicalPlan - */ - private def overwritePartition( - sparkSession: SparkSession, - table: CarbonTable, - logicalPlan: LogicalPlan, - serializationNullFormat: String, - badRecordAction: String, - isEmptyBadRecord: Boolean): Unit = { - val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) - - // Update the partitions as per the datatype expect for time and datetype as we - // expect user provides the format in standard spark/hive formats. - val updatedPartitions = CarbonScalaUtil.updatePartitions( - partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)), - table, - timeFormat = null, - dateFormat = null, - serializationNullFormat, - badRecordAction, - isEmptyBadRecord) - val existingPartitions = sparkSession.sessionState.catalog.listPartitions( - identifier, - Some(updatedPartitions)) - val partitionNames = existingPartitions.toList.flatMap { partition => - partition.spec.seq.map{case (column, value) => column + "=" + value} - }.toSet - val uniqueId = System.currentTimeMillis().toString - val segments = new SegmentStatusManager( - table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments - // If any existing partitions need to be overwritten then drop from partitionmap - if (partitionNames.nonEmpty) { - try { - // First drop the partitions from partition mapper files of each segment - new CarbonDropPartitionRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - partitionNames.toSeq, - uniqueId, - partialMatch = false).collect() - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - success = false, - uniqueId, - partitionNames.toSeq).collect() - throw e - } - - try { - Dataset.ofRows(sparkSession, logicalPlan) - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - success = false, - uniqueId, - partitionNames.toSeq).collect() - throw e - } - // Commit the removed partitions in carbon store. - new CarbonDropPartitionCommitRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - success = true, - uniqueId, - partitionNames.toSeq).collect() - // get valid segments - val validsegments = - new SegmentStatusManager( - table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments - // Update the loadstatus with update time to clear cache from driver. - CarbonUpdateUtil.updateTableMetadataStatus( - new util.HashSet[String](validsegments), - table, - uniqueId, - true, - new util.ArrayList[String]) - DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) - // Clean the overwriting segments if any. - new PartitionMapFileStore().cleanSegments( - table, - CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava, - false) - } else { - // Otherwise its a normal load - Dataset.ofRows(sparkSession, logicalPlan) - } - } def getDataFrameWithTupleID(): DataFrame = { val fields = dataFrame.get.schema.fields http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 17749c8..d2c691b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -36,14 +36,17 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types._ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.metadata.PartitionMapFileStore -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -110,6 +113,7 @@ with Serializable { model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) + model.setPartitionLoad(true) // Set the update timestamp if user sets in case of update query. It needs to be updated // in load status update time val updateTimeStamp = options.get("updatetimestamp") @@ -231,7 +235,9 @@ private class CarbonOutputWriter(path: String, fieldTypes: Seq[DataType], taskNo : String) extends OutputWriter with AbstractCarbonOutputWriter { - val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName) + val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) + val partitions = + getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName) val staticPartition: util.HashMap[String, Boolean] = { val staticPart = context.getConfiguration.get("carbon.staticpartition") if (staticPart != null) { @@ -272,24 +278,42 @@ private class CarbonOutputWriter(path: String, val formattedPartitions = updatedPartitions.map {case (col, value) => // Only convert the static partitions to the carbon format and use it while loading data // to carbon. - if (staticPartition.asScala.getOrElse(col, false)) { - (col, CarbonScalaUtil.convertToCarbonFormat(value, - CarbonScalaUtil.convertCarbonToSparkDataType( - table.getColumnByName(table.getTableName, col).getDataType), - timeFormat, - dateFormat)) - } else { - (col, value) - } + (col, value) } - (formattedPartitions, formattedPartitions.map(_._2)) + (formattedPartitions, updatePartitions(formattedPartitions.map(_._2))) } else { - (updatedPartitions, updatedPartitions.map(_._2)) + (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) } } else { (Map.empty[String, String].toArray, Array.empty) } - val writable = new StringArrayWritable() + + val writable = new ObjectArrayWritable + + private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = { + model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo + .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) => + + val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) { + DataTypes.INT + } else if (col.getDataType.equals(DataTypes.TIMESTAMP) || + col.getDataType.equals(DataTypes.DATE)) { + DataTypes.LONG + } else { + col.getDataType + } + if (staticPartition != null) { + DataTypeUtil.getDataBasedOnDataType( + CarbonScalaUtil.convertStaticPartitions( + partitionData(index), + col, + model.getCarbonDataLoadSchema.getCarbonTable), + dataType) + } else { + DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType) + } + }.toArray + } private val recordWriter: CarbonRecordWriter = { context.getConfiguration.set("carbon.outputformat.taskno", taskNo) @@ -302,11 +326,18 @@ private class CarbonOutputWriter(path: String, // TODO Implement writesupport interface to support writing Row directly to recordwriter def writeCarbon(row: InternalRow): Unit = { - val data = new Array[String](fieldTypes.length + partitionData.length) + val data = new Array[AnyRef](fieldTypes.length + partitionData.length) var i = 0 while (i < fieldTypes.length) { if (!row.isNullAt(i)) { - data(i) = row.getString(i) + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case other => + data(i) = row.get(i, other) + } } i += 1 } @@ -349,10 +380,7 @@ private class CarbonOutputWriter(path: String, updatedPartitions.toMap, table, timeFormat, - dateFormat, - serializeFormat, - badRecordAction, - isEmptyBadRecord) + dateFormat) formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) new PartitionMapFileStore().writePartitionMapFile( segmentPath, @@ -360,10 +388,13 @@ private class CarbonOutputWriter(path: String, partitonList) } - def getPartitionsFromPath(path: String, attemptContext: TaskAttemptContext): Array[String] = { + def getPartitionsFromPath( + path: String, + attemptContext: TaskAttemptContext, + model: CarbonLoadModel): Array[String] = { var attemptId = attemptContext.getTaskAttemptID.toString + "/" if (path.indexOf(attemptId) <= 0) { - val model = CarbonTableOutputFormat.getLoadModel(attemptContext.getConfiguration) + attemptId = model.getTableName + "/" } val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java index fb78deb..dc2fbbb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java @@ -37,6 +37,8 @@ public class DataField implements Serializable { private String timestampFormat; + private boolean useActualData; + public boolean hasDictionaryEncoding() { return column.hasEncoding(Encoding.DICTIONARY); } @@ -60,4 +62,12 @@ public class DataField implements Serializable { public void setTimestampFormat(String timestampFormat) { this.timestampFormat = timestampFormat; } + + public boolean isUseActualData() { + return useActualData; + } + + public void setUseActualData(boolean useActualData) { + this.useActualData = useActualData; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index f7eff81..ba24d41 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -41,6 +41,7 @@ import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStep import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl; import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl; import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl; +import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl; import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl; import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -62,6 +63,8 @@ public final class DataLoadProcessBuilder { return buildInternalForBucketing(inputIterators, configuration); } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { return buildInternalForBatchSort(inputIterators, configuration); + } else if (loadModel.isPartitionLoad()) { + return buildInternalForPartitionLoad(inputIterators, configuration, sortScope); } else { return buildInternal(inputIterators, configuration); } @@ -96,6 +99,32 @@ public final class DataLoadProcessBuilder { return new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep); } + /** + * Build pipe line for partition load + */ + private AbstractDataLoadProcessorStep buildInternalForPartitionLoad( + CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration, + SortScopeOptions.SortScope sortScope) { + // Wraps with dummy processor. + AbstractDataLoadProcessorStep inputProcessorStep = + new InputProcessorStepForPartitionImpl(configuration, inputIterators); + if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) { + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, inputProcessorStep); + // Writes the sorted data in carbondata format. + return new DataWriterProcessorStepImpl(configuration, sortProcessorStep); + } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { + // Sorts the data by SortColumn or not + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, inputProcessorStep); + // Writes the sorted data in carbondata format. + return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep); + } else { + // In all other cases like global sort and no sort uses this step + return new CarbonRowDataWriterProcessorStepImpl(configuration, inputProcessorStep); + } + } + private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration) { // 1. Reads the data input iterators and parses the data. http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java index 2d70f03..85eb19b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java @@ -47,6 +47,8 @@ public class MeasureFieldConverterImpl implements FieldConverter { private boolean isEmptyBadRecord; + private DataField dataField; + public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index, boolean isEmptyBadRecord) { this.dataType = dataField.getColumn().getDataType(); @@ -54,6 +56,7 @@ public class MeasureFieldConverterImpl implements FieldConverter { this.nullformat = nullformat; this.index = index; this.isEmptyBadRecord = isEmptyBadRecord; + this.dataField = dataField; } @Override @@ -85,7 +88,11 @@ public class MeasureFieldConverterImpl implements FieldConverter { row.update(null, index); } else { try { - output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure); + if (dataField.isUseActualData()) { + output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure); + } else { + output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure); + } row.update(output, index); } catch (NumberFormatException e) { LOGGER.warn( http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java index ced37dd..9cf7fe4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java @@ -68,14 +68,25 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { dateFormat = dataField.getTimestampFormat(); } try { - byte[] value = DataTypeUtil - .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); - if (dataType == DataTypes.STRING - && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + if (!dataField.isUseActualData()) { + byte[] value = DataTypeUtil + .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); + if (dataType == DataTypes.STRING + && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { + throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " + + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + } + row.update(value, index); + } else { + Object value = DataTypeUtil + .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); + if (dataType == DataTypes.STRING + && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { + throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " + + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + } + row.update(value, index); } - row.update(value, index); } catch (CarbonDataLoadingException e) { throw e; } catch (Throwable ex) { @@ -99,7 +110,9 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { } private void updateWithNullValue(CarbonRow row) { - if (dataType == DataTypes.STRING) { + if (dataField.isUseActualData()) { + row.update(null, index); + } else if (dataType == DataTypes.STRING) { row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index); } else { row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index); http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index 66943c8..9229598 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory; * It is wrapper class to hold the rows in batches when record writer writes the data and allows * to iterate on it during data load. It uses blocking queue to coordinate between read and write. */ -public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { +public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { private static final Log LOG = LogFactory.getLog(CarbonOutputIteratorWrapper.class); @@ -46,7 +46,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); - public void write(String[] row) throws InterruptedException { + public void write(Object[] row) throws InterruptedException { if (!loadBatch.addRow(row)) { loadBatch.readyRead(); queue.put(loadBatch); @@ -78,7 +78,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { } @Override - public String[] next() { + public Object[] next() { return readBatch.next(); } @@ -100,16 +100,16 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { } } - private static class RowBatch extends CarbonIterator<String[]> { + private static class RowBatch extends CarbonIterator<Object[]> { private int counter; - private String[][] batch; + private Object[][] batch; private int size; private RowBatch(int size) { - batch = new String[size][]; + batch = new Object[size][]; this.size = size; } @@ -118,7 +118,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { * @param row * @return false if the row cannot be added as batch is full. */ - public boolean addRow(String[] row) { + public boolean addRow(Object[] row) { batch[counter++] = row; return counter < size; } @@ -134,7 +134,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { } @Override - public String[] next() { + public Object[] next() { assert (counter < size); return batch[counter++]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index d41455f..4c536ea 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -188,6 +188,11 @@ public class CarbonLoadModel implements Serializable { private boolean isAggLoadRequest; + /** + * It directly writes data directly to nosort processor bypassing all other processors. + */ + private boolean isPartitionLoad; + public boolean isAggLoadRequest() { return isAggLoadRequest; } @@ -401,6 +406,7 @@ public class CarbonLoadModel implements Serializable { copy.batchSortSizeInMb = batchSortSizeInMb; copy.badRecordsLocation = badRecordsLocation; copy.isAggLoadRequest = isAggLoadRequest; + copy.isPartitionLoad = isPartitionLoad; return copy; } @@ -454,6 +460,7 @@ public class CarbonLoadModel implements Serializable { copy.batchSortSizeInMb = batchSortSizeInMb; copy.isAggLoadRequest = isAggLoadRequest; copy.badRecordsLocation = badRecordsLocation; + copy.isPartitionLoad = isPartitionLoad; return copy; } @@ -855,4 +862,12 @@ public class CarbonLoadModel implements Serializable { public void setSkipEmptyLine(String skipEmptyLine) { this.skipEmptyLine = skipEmptyLine; } + + public boolean isPartitionLoad() { + return isPartitionLoad; + } + + public void setPartitionLoad(boolean partitionLoad) { + isPartitionLoad = partitionLoad; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java new file mode 100644 index 0000000..1dc9b27 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.steps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.row.CarbonRowBatch; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It reads data from record reader and sends data to next step. + */ +public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep { + + private CarbonIterator<Object[]>[] inputIterators; + + private boolean[] noDictionaryMapping; + + private DataType[] dataTypes; + + private int[] orderOfData; + + public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration, + CarbonIterator<Object[]>[] inputIterators) { + super(configuration, null); + this.inputIterators = inputIterators; + } + + @Override public DataField[] getOutput() { + return configuration.getDataFields(); + } + + @Override public void initialize() throws IOException { + super.initialize(); + // if logger is enabled then raw data will be required. + RowConverterImpl rowConverter = + new RowConverterImpl(configuration.getDataFields(), configuration, null); + rowConverter.initialize(); + configuration.setCardinalityFinder(rowConverter); + noDictionaryMapping = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + dataTypes = new DataType[configuration.getDataFields().length]; + for (int i = 0; i < dataTypes.length; i++) { + if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) { + dataTypes[i] = DataTypes.INT; + } else { + dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType(); + } + } + orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader()); + } + + private int[] arrangeData(DataField[] dataFields, String[] header) { + int[] data = new int[dataFields.length]; + for (int i = 0; i < dataFields.length; i++) { + for (int j = 0; j < header.length; j++) { + if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) { + data[i] = j; + break; + } + } + } + return data; + } + + @Override public Iterator<CarbonRowBatch>[] execute() { + int batchSize = CarbonProperties.getInstance().getBatchSize(); + List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; + for (int i = 0; i < outIterators.length; i++) { + outIterators[i] = + new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(), + rowCounter, orderOfData, noDictionaryMapping, dataTypes); + } + return outIterators; + } + + /** + * Partition input iterators equally as per the number of threads. + * + * @return + */ + private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() { + // Get the number of cores configured in property. + int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + // Get the minimum of number of cores and iterators size to get the number of parallel threads + // to be launched. + int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); + + List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber]; + for (int i = 0; i < parallelThreadNumber; i++) { + iterators[i] = new ArrayList<>(); + } + // Equally partition the iterators as per number of threads + for (int i = 0; i < inputIterators.length; i++) { + iterators[i % parallelThreadNumber].add(inputIterators[i]); + } + return iterators; + } + + @Override protected CarbonRow processRow(CarbonRow row) { + return null; + } + + @Override public void close() { + if (!closed) { + super.close(); + for (CarbonIterator inputIterator : inputIterators) { + inputIterator.close(); + } + } + } + + @Override protected String getStepName() { + return "Input Processor"; + } + + /** + * This iterator wraps the list of iterators and it starts iterating the each + * iterator of the list one by one. It also parse the data while iterating it. + */ + private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { + + private List<CarbonIterator<Object[]>> inputIterators; + + private CarbonIterator<Object[]> currentIterator; + + private int counter; + + private int batchSize; + + private boolean nextBatch; + + private boolean firstTime; + + private AtomicLong rowCounter; + + private boolean[] noDictionaryMapping; + + private DataType[] dataTypes; + + private int[] orderOfData; + + public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, + boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, + DataType[] dataTypes) { + this.inputIterators = inputIterators; + this.batchSize = batchSize; + this.counter = 0; + // Get the first iterator from the list. + currentIterator = inputIterators.get(counter++); + this.rowCounter = rowCounter; + this.nextBatch = false; + this.firstTime = true; + this.noDictionaryMapping = noDictionaryMapping; + this.dataTypes = dataTypes; + this.orderOfData = orderOfData; + } + + @Override public boolean hasNext() { + return nextBatch || internalHasNext(); + } + + private boolean internalHasNext() { + if (firstTime) { + firstTime = false; + currentIterator.initialize(); + } + boolean hasNext = currentIterator.hasNext(); + // If iterator is finished then check for next iterator. + if (!hasNext) { + currentIterator.close(); + // Check next iterator is available in the list. + if (counter < inputIterators.size()) { + // Get the next iterator from the list. + currentIterator = inputIterators.get(counter++); + // Initialize the new iterator + currentIterator.initialize(); + hasNext = internalHasNext(); + } + } + return hasNext; + } + + @Override public CarbonRowBatch next() { + return getBatch(); + } + + private CarbonRowBatch getBatch() { + // Create batch and fill it. + CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); + int count = 0; + while (internalHasNext() && count < batchSize) { + carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); + count++; + } + rowCounter.getAndAdd(carbonRowBatch.getSize()); + return carbonRowBatch; + } + + private Object[] convertToNoDictionaryToBytes(Object[] data) { + Object[] newData = new Object[data.length]; + for (int i = 0; i < noDictionaryMapping.length; i++) { + if (noDictionaryMapping[i]) { + newData[i] = DataTypeUtil + .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); + } else { + newData[i] = data[orderOfData[i]]; + } + } + if (newData.length > noDictionaryMapping.length) { + for (int i = noDictionaryMapping.length; i < newData.length; i++) { + newData[i] = data[orderOfData[i]]; + } + } + // System.out.println(Arrays.toString(data)); + return newData; + } + + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 06522a4..b795696 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -558,6 +558,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { @Override public Void call() throws Exception { try { TablePage tablePage = processDataRows(dataRows); + dataRows = null; tablePage.setIsLastPage(isLastPage); // insert the object in array according to sequence number int indexInNodeHolderArray = (pageId - 1) % numberOfCores; http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 2a4cc00..376a546 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -482,22 +482,19 @@ public final class CarbonDataProcessorUtil { /** * Get the number of partitions in global sort - * @param configuration + * @param globalSortPartitions * @return the number of partitions */ - public static int getGlobalSortPartitions(CarbonDataLoadConfiguration configuration) { + public static int getGlobalSortPartitions(Object globalSortPartitions) { int numPartitions; try { // First try to get the number from ddl, otherwise get it from carbon properties. - if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS) - == null) { + if (globalSortPartitions == null) { numPartitions = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)); } else { - numPartitions = Integer.parseInt( - configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS) - .toString()); + numPartitions = Integer.parseInt(globalSortPartitions.toString()); } } catch (Exception e) { numPartitions = 0;