http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala new file mode 100644 index 0000000..1d8d6b2 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.lang.Long +import java.text.SimpleDateFormat +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD} +import org.apache.spark.sql.Row +import org.apache.spark.util.SparkUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.StandardLogService +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} +import org.apache.carbondata.processing.constants.DataProcessorConstants +import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator +import org.apache.carbondata.processing.csvreaderstep.RddInputUtils +import org.apache.carbondata.processing.etl.DataLoadingException +import org.apache.carbondata.processing.graphgenerator.GraphGenerator +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.DataLoadResult +import org.apache.carbondata.spark.load._ +import org.apache.carbondata.spark.splits.TableSplit +import org.apache.carbondata.spark.util.CarbonQueryUtil +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This partition class use to split by TableSplit + * + */ +class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit, + val blocksDetails: Array[BlockDetails]) + extends Partition { + + override val index: Int = idx + val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) + val partitionBlocksDetail = blocksDetails + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * This partition class use to split by Host + * + */ +class CarbonNodePartition(rddId: Int, val idx: Int, host: String, + val blocksDetails: Array[BlockDetails]) + extends Partition { + + override val index: Int = idx + val serializableHadoopSplit = host + val nodeBlocksDetail = blocksDetails + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class SparkPartitionLoader(model: CarbonLoadModel, + splitIndex: Int, + storePath: String, + kettleHomePath: String, + loadCount: Int, + loadMetadataDetails: LoadMetadataDetails) { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + var storeLocation: String = "" + + def initialize(): Unit = { + val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) + if (null == carbonPropertiesFilePath) { + System.setProperty("carbon.properties.filepath", + System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") + } + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) + CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") + CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") + CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true") + CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true") + CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true") + CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000") + CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false") + CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000") + + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex + } + + def run(): Unit = { + try { + CarbonLoaderUtil.executeGraph(model, storeLocation, storePath, + kettleHomePath) + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) + } catch { + case e: DataLoadingException => if (e.getErrorCode == + DataProcessorConstants.BAD_REC_FOUND) { + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) + LOGGER.info("Bad Record Found") + } else { + throw e + } + case e: Exception => + throw e + } finally { + // delete temp location data + try { + val isCompaction = false + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction) + } catch { + case e: Exception => + LOGGER.error(e, "Failed to delete local data") + } + if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals( + loadMetadataDetails.getLoadStatus)) { + if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS + .equals(loadMetadataDetails.getLoadStatus)) { + LOGGER.info("DataLoad complete") + LOGGER.info("Data Load partially successful with LoadCount:" + loadCount) + } else { + LOGGER.info("DataLoad complete") + LOGGER.info("Data Loaded successfully with LoadCount:" + loadCount) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo( + model.getPartitionId) + } + } + } + } +} + +/** + * Use this RDD class to load csv data file + * + * @param sc The SparkContext to associate the RDD with. + * @param result Output result + * @param carbonLoadModel Carbon load model which contain the load info + * @param storePath The store location + * @param kettleHomePath The kettle home path + * @param columinar whether it is columinar + * @param loadCount Current load count + * @param tableCreationTime Time of creating table + * @param schemaLastUpdatedTime Time of last schema update + * @param blocksGroupBy Blocks Array which is group by partition or host + * @param isTableSplitPartition Whether using table split partition + * @tparam K Class of the key associated with the Result. + * @tparam V Class of the value associated with the Result. + */ +class DataFileLoaderRDD[K, V]( + sc: SparkContext, + result: DataLoadResult[K, V], + carbonLoadModel: CarbonLoadModel, + storePath: String, + kettleHomePath: String, + columinar: Boolean, + loadCount: Integer, + tableCreationTime: Long, + schemaLastUpdatedTime: Long, + blocksGroupBy: Array[(String, Array[BlockDetails])], + isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + override def getPartitions: Array[Partition] = { + if (isTableSplitPartition) { + // for table split partition + var splits = Array[TableSplit]() + if (carbonLoadModel.isDirectLoad) { + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) + } else { + splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, null) + } + + splits.zipWithIndex.map { case (split, index) => + // filter the same partition unique id, because only one will match, so get 0 element + val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) => + uniqueId == split.getPartition.getUniqueID + }(0)._2 + new CarbonTableSplitPartition(id, index, split, blocksDetails) + } + } else { + // for node partition + blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) => + new CarbonNodePartition(id, index, uniqueId, blockDetails) + } + } + } + + override def checkpoint() { + // Do nothing. Hadoop RDD should not be checkpointed. + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val iter = new Iterator[(K, V)] { + var partitionID = "0" + val loadMetadataDetails = new LoadMetadataDetails() + var model: CarbonLoadModel = _ + var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + + theSplit.index + try { + loadMetadataDetails.setPartitionCount(partitionID) + carbonLoadModel.setSegmentId(String.valueOf(loadCount)) + setModelAndBlocksInfo() + val loader = new SparkPartitionLoader(model, theSplit.index, storePath, + kettleHomePath, loadCount, loadMetadataDetails) + loader.initialize + if (model.isRetentionRequest) { + recreateAggregationTableForRetention + } else if (model.isAggLoadRequest) { + loadMetadataDetails.setLoadStatus(createManualAggregateTable) + } else { + loader.run() + } + } catch { + case e: Exception => + logInfo("DataLoad failure") + LOGGER.error(e) + throw e + } + + def setModelAndBlocksInfo(): Unit = { + if (isTableSplitPartition) { + // for table split partition + val split = theSplit.asInstanceOf[CarbonTableSplitPartition] + logInfo("Input split: " + split.serializableHadoopSplit.value) + val blocksID = gernerateBlocksID + carbonLoadModel.setBlocksID(blocksID) + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + if (carbonLoadModel.isDirectLoad) { + model = carbonLoadModel.getCopyWithPartition( + split.serializableHadoopSplit.value.getPartition.getUniqueID, + split.serializableHadoopSplit.value.getPartition.getFilesPath, + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) + } else { + model = carbonLoadModel.getCopyWithPartition( + split.serializableHadoopSplit.value.getPartition.getUniqueID) + } + partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID + // get this partition data blocks and put it to global static map + GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail) + StandardLogService.setThreadName(partitionID, null) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap( + partitionID, split.partitionBlocksDetail.length) + } else { + // for node partition + val split = theSplit.asInstanceOf[CarbonNodePartition] + logInfo("Input split: " + split.serializableHadoopSplit) + logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap( + split.serializableHadoopSplit, split.nodeBlocksDetail.length) + val blocksID = gernerateBlocksID + carbonLoadModel.setBlocksID(blocksID) + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + // set this node blocks info to global static map + GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail) + if (carbonLoadModel.isDirectLoad) { + val filelist: java.util.List[String] = new java.util.ArrayList[String]( + CarbonCommonConstants.CONSTANT_SIZE_TEN) + CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",") + model = carbonLoadModel.getCopyWithPartition(partitionID, filelist, + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) + } else { + model = carbonLoadModel.getCopyWithPartition(partitionID) + } + StandardLogService.setThreadName(blocksID, null) + } + } + + /** + * generate blocks id + * + * @return + */ + def gernerateBlocksID: String = { + if (isTableSplitPartition) { + carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" + + theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value + .getPartition.getUniqueID + "_" + UUID.randomUUID() + } else { + carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" + + UUID.randomUUID() + } + } + + def checkAndLoadAggregationTable: String = { + val schema = model.getCarbonDataLoadSchema + val aggTables = schema.getCarbonTable.getAggregateTablesName + if (null != aggTables && !aggTables.isEmpty) { + val details = model.getLoadMetadataDetails.asScala.toArray + val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount + var listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details) + listOfLoadFolders = CarbonLoaderUtil.addNewSliceNameToList(newSlice, listOfLoadFolders) + val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details) + var listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details) + listOfAllLoadFolders = CarbonLoaderUtil + .addNewSliceNameToList(newSlice, listOfAllLoadFolders) + val copyListOfLoadFolders = listOfLoadFolders.asScala.toList + val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList + loadTableSlices(listOfAllLoadFolders, details) + val loadFolders = Array[String]() + loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables, + copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders)) + if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals( + loadMetadataDetails.getLoadStatus)) { + // remove the current slice from memory not the table + CarbonLoaderUtil + .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice) + logInfo(s"Aggregate table creation failed") + } else { + logInfo("Aggregate tables creation successfull") + } + } + loadMetadataDetails.getLoadStatus + } + + def loadTableSlices(listOfAllLoadFolders: java.util.List[String], + loadMetadataDetails: Array[LoadMetadataDetails]) = { + CarbonProperties.getInstance().addProperty("carbon.cache.used", "false") + // TODO: Implement it + } + + def createManualAggregateTable: String = { + val details = model.getLoadMetadataDetails.asScala.toArray + val listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details) + val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details) + val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details) + loadTableSlices(listOfAllLoadFolders, details) + val loadFolders = Array[String]() + val aggTable = model.getAggTableName + loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders, + listOfUpdatedLoadFolders, loadFolders)) + if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals( + loadMetadataDetails.getLoadStatus)) { + logInfo(s"Aggregate table creation failed :: $aggTable") + } else { + logInfo(s"Aggregate table creation successfull :: $aggTable") + } + loadMetadataDetails.getLoadStatus + } + + def recreateAggregationTableForRetention = { + val schema = model.getCarbonDataLoadSchema + val aggTables = schema.getCarbonTable.getAggregateTablesName + if (null != aggTables && !aggTables.isEmpty) { + val details = model.getLoadMetadataDetails.asScala.toArray + val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details) + val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details) + val listOfAllLoadFolder = CarbonQueryUtil.getListOfSlices(details) + loadTableSlices(listOfAllLoadFolder, details) + val loadFolders = Array[String]() + iterateOverAggTables(aggTables, listOfLoadFolders, listOfUpdatedLoadFolders, loadFolders) + } + } + + // TODO Aggregate table needs to be handled + def iterateOverAggTables(aggTables: java.util.List[String], + listOfLoadFolders: java.util.List[String], + listOfUpdatedLoadFolders: java.util.List[String], + loadFolders: Array[String]): String = { + model.setAggLoadRequest(true) + aggTables.asScala.foreach { aggTable => + model.setAggTableName(aggTable) + loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders, + listOfUpdatedLoadFolders, loadFolders)) + if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals( + loadMetadataDetails.getLoadStatus)) { + logInfo(s"Aggregate table creation failed :: aggTable") + return loadMetadataDetails.getLoadStatus + } + } + loadMetadataDetails.getLoadStatus + } + + def loadAggregationTable(listOfLoadFolders: java.util.List[String], + listOfUpdatedLoadFolders: java.util.List[String], + loadFolders: Array[String]): String = { + // TODO: Implement it + loadMetadataDetails.getLoadStatus + } + + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(uniqueLoadStatusId, loadMetadataDetails) + } + } + iter + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + if (isTableSplitPartition) { + // for table split partition + val theSplit = split.asInstanceOf[CarbonTableSplitPartition] + val location = theSplit.serializableHadoopSplit.value.getLocations.asScala + location + } else { + // for node partition + val theSplit = split.asInstanceOf[CarbonNodePartition] + val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit) + logInfo("Preferred Location for split: " + firstOptionLocation.head) + val blockMap = new util.LinkedHashMap[String, Integer]() + val tableBlocks = theSplit.blocksDetails + tableBlocks.foreach { tableBlock => + tableBlock.getLocations.foreach { location => + if (!firstOptionLocation.exists(location.equalsIgnoreCase)) { + val currentCount = blockMap.get(location) + if (currentCount == null) { + blockMap.put(location, 1) + } else { + blockMap.put(location, currentCount + 1) + } + } + } + } + + val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => { + nodeCount1.getValue > nodeCount2.getValue + } + ) + + val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2) + firstOptionLocation ++ sortedNodesList + } + } + +} + +/** + * Use this RDD class to load RDD + * + * @param sc + * @param result + * @param carbonLoadModel + * @param storePath + * @param kettleHomePath + * @param columinar + * @param loadCount + * @param tableCreationTime + * @param schemaLastUpdatedTime + * @param prev + * @tparam K + * @tparam V + */ +class DataFrameLoaderRDD[K, V]( + sc: SparkContext, + result: DataLoadResult[K, V], + carbonLoadModel: CarbonLoadModel, + storePath: String, + kettleHomePath: String, + columinar: Boolean, + loadCount: Integer, + tableCreationTime: Long, + schemaLastUpdatedTime: Long, + prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + @DeveloperApi + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val resultIter = new Iterator[(K, V)] { + var partitionID = "0" + val loadMetadataDetails = new LoadMetadataDetails() + var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + + theSplit.index + try { + loadMetadataDetails.setPartitionCount(partitionID) + carbonLoadModel.setPartitionId(partitionID) + carbonLoadModel.setSegmentId(String.valueOf(loadCount)) + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath, + kettleHomePath, loadCount, loadMetadataDetails) + loader.initialize + val rddIteratorKey = UUID.randomUUID().toString + try { + RddInputUtils.put(rddIteratorKey, + new PartitionIterator( + firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context), + carbonLoadModel, + context)) + carbonLoadModel.setRddIteratorKey(rddIteratorKey) + loader.run() + } finally { + RddInputUtils.remove(rddIteratorKey) + } + } catch { + case e: Exception => + logInfo("DataLoad failure") + LOGGER.error(e) + throw e + } + + var finished = false + + override def hasNext: Boolean = !finished + + override def next(): (K, V) = { + finished = true + result.getKey(uniqueLoadStatusId, loadMetadataDetails) + } + } + resultIter + } + + override protected def getPartitions: Array[Partition] = firstParent[Row].partitions +} + +class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]], + carbonLoadModel: CarbonLoadModel, + context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] { + def hasNext: Boolean = partitionIter.hasNext + def next: JavaRddIterator[Array[String]] = { + val value = partitionIter.next + new RddIterator(value.rdd.iterator(value.partition, context), + carbonLoadModel, + context) + } + def initialize: Unit = { + SparkUtil.setTaskContext(context) + } +} +/** + * This class wrap Scala's Iterator to Java's Iterator. + * It also convert all columns to string data to use csv data loading flow. + * + * @param rddIter + * @param carbonLoadModel + * @param context + */ +class RddIterator(rddIter: Iterator[Row], + carbonLoadModel: CarbonLoadModel, + context: TaskContext) extends JavaRddIterator[Array[String]] { + + val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants + .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + val format = new SimpleDateFormat(formatString) + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + def hasNext: Boolean = rddIter.hasNext + + def next: Array[String] = { + val row = rddIter.next() + val columns = new Array[String](row.length) + for (i <- 0 until columns.length) { + columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat, + delimiterLevel1, delimiterLevel2, format) + } + columns + } + + def initialize: Unit = { + SparkUtil.setTaskContext(context) + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala new file mode 100644 index 0000000..0534def --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.LoadMetadataDetails +import org.apache.carbondata.spark.DeletedLoadResult +import org.apache.carbondata.spark.load.DeletedLoadMetadata +import org.apache.carbondata.spark.util.CarbonQueryUtil + +class CarbonDeleteLoadByDateRDD[K, V]( + sc: SparkContext, + result: DeletedLoadResult[K, V], + databaseName: String, + tableName: String, + dateField: String, + dateFieldActualName: String, + dateValue: String, + factTableName: String, + dimTableName: String, + storePath: String, + loadMetadataDetails: List[LoadMetadataDetails]) + extends RDD[(K, V)](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + override def getPartitions: Array[Partition] = { + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) + splits.zipWithIndex.map {s => + new CarbonLoadPartition(id, s._2, s._1) + } + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + new Iterator[(K, V)] { + val deletedMetaData = new DeletedLoadMetadata() + val split = theSplit.asInstanceOf[CarbonLoadPartition] + logInfo("Input split: " + split.serializableHadoopSplit.value) + + logInfo("Input split: " + split.serializableHadoopSplit.value) + val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID + + // TODO call CARBON delete API + logInfo("Applying data retention as per date value " + dateValue) + var dateFormat = "" + try { + dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + } catch { + case e: Exception => logInfo("Unable to parse with default time format " + dateValue) + } + // TODO: Implement it + var finished = false + + override def hasNext: Boolean = { + finished + } + + override def next(): (K, V) = { + result.getKey(null, null) + } + } + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + val theSplit = split.asInstanceOf[CarbonLoadPartition] + val s = theSplit.serializableHadoopSplit.value.getLocations.asScala + logInfo("Host Name: " + s.head + s.length) + s + } +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala new file mode 100644 index 0000000..26e1abc --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.Partitioner + +import org.apache.carbondata.spark.Value +import org.apache.carbondata.spark.util.CarbonQueryUtil + +class CarbonDeleteLoadRDD[V: ClassTag]( + sc: SparkContext, + valueClass: Value[V], + loadId: Int, + databaseName: String, + tableName: String, + partitioner: Partitioner) + extends RDD[V](sc, Nil) { + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + override def getPartitions: Array[Partition] = { + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) + splits.zipWithIndex.map {f => + new CarbonLoadPartition(id, f._2, f._1) + } + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + val iter = new Iterator[V] { + val split = theSplit.asInstanceOf[CarbonLoadPartition] + logInfo("Input split: " + split.serializableHadoopSplit.value) + // TODO call CARBON delete API + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): V = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + valueClass.getValue(null) + } + + } + logInfo("********Deleting***************") + iter + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + val theSplit = split.asInstanceOf[CarbonLoadPartition] + val s = theSplit.serializableHadoopSplit.value.getLocations.asScala + logInfo("Host Name: " + s.head + s.length) + s + } +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala new file mode 100644 index 0000000..dc63098 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD + +import org.apache.carbondata.spark.Value +import org.apache.carbondata.spark.util.CarbonQueryUtil + +class CarbonDropTableRDD[V: ClassTag]( + sc: SparkContext, + valueClass: Value[V], + databaseName: String, + tableName: String) + extends RDD[V](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + override def getPartitions: Array[Partition] = { + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) + splits.zipWithIndex.map { s => + new CarbonLoadPartition(id, s._2, s._1) + } + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + + val iter = new Iterator[V] { + // TODO: Clear Btree from memory + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): V = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + valueClass.getValue(null) + } + } + iter + } +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala new file mode 100644 index 0000000..3c15818 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.io.{DataInputStream, InputStreamReader} +import java.nio.charset.Charset +import java.text.SimpleDateFormat +import java.util.regex.Pattern + +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.util.control.Breaks.{break, breakable} + +import au.com.bytecode.opencsv.CSVReader +import org.apache.commons.lang3.{ArrayUtils, StringUtils} +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +import org.apache.carbondata.common.factory.CarbonCommonFactory +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.load.CarbonLoaderUtil +import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} +import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.carbondata.spark.util.GlobalDictionaryUtil + +/** + * A partitioner partition by column. + * + * @constructor create a partitioner + * @param numParts the number of partitions + */ +class ColumnPartitioner(numParts: Int) extends Partitioner { + override def numPartitions: Int = numParts + + override def getPartition(key: Any): Int = key.asInstanceOf[Int] +} + +trait GenericParser { + val dimension: CarbonDimension + + def addChild(child: GenericParser): Unit + + def parseString(input: String): Unit +} + +case class DictionaryStats(distinctValues: java.util.List[String], + dictWriteTime: Long, sortIndexWriteTime: Long) + +case class PrimitiveParser(dimension: CarbonDimension, + setOpt: Option[HashSet[String]]) extends GenericParser { + val (hasDictEncoding, set: HashSet[String]) = setOpt match { + case None => (false, new HashSet[String]) + case Some(x) => (true, x) + } + + def addChild(child: GenericParser): Unit = { + } + + def parseString(input: String): Unit = { + if (hasDictEncoding && input != null) { + set.add(input) + } + } +} + +case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser { + var children: GenericParser = _ + + def addChild(child: GenericParser): Unit = { + children = child + } + + def parseString(input: String): Unit = { + if (StringUtils.isNotEmpty(input)) { + val splits = format.getSplits(input) + if (ArrayUtils.isNotEmpty(splits)) { + splits.foreach { s => + children.parseString(s) + } + } + } + } +} + +case class StructParser(dimension: CarbonDimension, + format: DataFormat) extends GenericParser { + val children = new ArrayBuffer[GenericParser] + + def addChild(child: GenericParser): Unit = { + children += child + } + + def parseString(input: String): Unit = { + if (StringUtils.isNotEmpty(input)) { + val splits = format.getSplits(input) + val len = Math.min(children.length, splits.length) + for (i <- 0 until len) { + children(i).parseString(splits(i)) + } + } + } +} + +case class DataFormat(delimiters: Array[String], + var delimiterIndex: Int, + patterns: Array[Pattern]) extends Serializable { + self => + def getSplits(input: String): Array[String] = { + // -1 in case after splitting the last column is empty, the surrogate key ahs to be generated + // for empty value too + patterns(delimiterIndex).split(input, -1) + } + + def cloneAndIncreaseIndex: DataFormat = { + DataFormat(delimiters, Math.min(delimiterIndex + 1, delimiters.length - 1), patterns) + } +} + +/** + * a case class to package some attributes + */ +case class DictionaryLoadModel(table: CarbonTableIdentifier, + dimensions: Array[CarbonDimension], + hdfsLocation: String, + dictfolderPath: String, + dictFilePaths: Array[String], + dictFileExists: Array[Boolean], + isComplexes: Array[Boolean], + primDimensions: Array[CarbonDimension], + delimiters: Array[String], + highCardIdentifyEnable: Boolean, + highCardThreshold: Int, + rowCountPercentage: Double, + columnIdentifier: Array[ColumnIdentifier], + isFirstLoad: Boolean, + hdfsTempLocation: String, + lockType: String, + zooKeeperUrl: String, + serializationNullFormat: String) extends Serializable + +case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable + +/** + * A RDD to combine all dictionary distinct values. + * + * @constructor create a RDD with RDD[(String, Iterable[String])] + * @param prev the input RDD[(String, Iterable[String])] + * @param model a model package load info + */ +class CarbonAllDictionaryCombineRDD( + prev: RDD[(String, Iterable[String])], + model: DictionaryLoadModel) + extends RDD[(Int, ColumnDistinctValues)](prev) { + + override def getPartitions: Array[Partition] = { + firstParent[(String, Iterable[String])].partitions + } + + override def compute(split: Partition, context: TaskContext + ): Iterator[(Int, ColumnDistinctValues)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])] + /* + * for all dictionary, all columns need to encoding and checking + * isHighCardinalityColumn, so no need to calculate rowcount + */ + val rowCount = 0L + try { + val dimensionParsers = + GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList) + val dimNum = model.dimensions.length + // Map[dimColName -> dimColNameIndex] + val columnIndexMap = new HashMap[String, Int]() + for (j <- 0 until dimNum) { + columnIndexMap.put(model.dimensions(j).getColName, j) + } + + var row: (String, Iterable[String]) = null + val rddIter = firstParent[(String, Iterable[String])].iterator(split, context) + // generate block distinct value set + while (rddIter.hasNext) { + row = rddIter.next() + if (row != null) { + columnIndexMap.get(row._1) match { + case Some(index) => + for (record <- row._2) { + dimensionParsers(index).parseString(record) + } + case None => + } + } + } + } catch { + case ex: Exception => + LOGGER.error(ex) + throw ex + } + + distinctValuesList.map { iter => + val valueList = iter._2.toArray + (iter._1, ColumnDistinctValues(valueList, rowCount)) + }.iterator + } +} + +/** + * A RDD to combine distinct values in block. + * + * @constructor create a RDD with RDD[Row] + * @param prev the input RDD[Row] + * @param model a model package load info + */ +class CarbonBlockDistinctValuesCombineRDD( + prev: RDD[Row], + model: DictionaryLoadModel) + extends RDD[(Int, ColumnDistinctValues)](prev) { + + override def getPartitions: Array[Partition] = firstParent[Row].partitions + + override def compute(split: Partition, + context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() + val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])] + var rowCount = 0L + try { + val dimensionParsers = + GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList) + val dimNum = model.dimensions.length + var row: Row = null + val rddIter = firstParent[Row].iterator(split, context) + val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants + .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + val format = new SimpleDateFormat(formatString) + // generate block distinct value set + while (rddIter.hasNext) { + row = rddIter.next() + if (row != null) { + rowCount += 1 + for (i <- 0 until dimNum) { + dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i), + model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format)) + } + } + } + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() + } catch { + case ex: Exception => + LOGGER.error(ex) + throw ex + } + + distinctValuesList.map { iter => + val valueList = iter._2.toArray + (iter._1, ColumnDistinctValues(valueList, rowCount)) + }.iterator + } +} + +/** + * A RDD to generate dictionary file for each column + * + * @constructor create a RDD with RDD[Row] + * @param prev the input RDD[Row] + * @param model a model package load info + */ +class CarbonGlobalDictionaryGenerateRDD( + prev: RDD[(Int, ColumnDistinctValues)], + model: DictionaryLoadModel) + extends RDD[(Int, String, Boolean)](prev) { + + override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + var isHighCardinalityColumn = false + val iter = new Iterator[(Int, String, Boolean)] { + var dictionaryForDistinctValueLookUp: + org.apache.carbondata.core.cache.dictionary.Dictionary = _ + var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _ + var dictionaryForDistinctValueLookUpCleared: Boolean = false + val pathService = CarbonCommonFactory.getPathService + val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table) + if (StringUtils.isNotBlank(model.hdfsTempLocation )) { + CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, + model.hdfsTempLocation) + } + if (StringUtils.isNotBlank(model.lockType)) { + CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, + model.lockType) + } + if (StringUtils.isNotBlank(model.zooKeeperUrl)) { + CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, + model.zooKeeperUrl) + } + val dictLock = CarbonLockFactory + .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory, + model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK) + var isDictionaryLocked = false + // generate distinct value list + try { + val t1 = System.currentTimeMillis + val valuesBuffer = new mutable.HashSet[String] + val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context) + var rowCount = 0L + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime() + breakable { + while (rddIter.hasNext) { + val distinctValueList = rddIter.next()._2 + valuesBuffer ++= distinctValueList.values + rowCount += distinctValueList.rowCount + // check high cardinality + if (model.isFirstLoad && model.highCardIdentifyEnable + && !model.isComplexes(split.index) + && model.dimensions(split.index).isColumnar) { + isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn( + valuesBuffer.size, rowCount, model) + if (isHighCardinalityColumn) { + break + } + } + } + } + val combineListTime = System.currentTimeMillis() - t1 + if (isHighCardinalityColumn) { + LOGGER.info(s"column ${ model.table.getTableUniqueName }." + + s"${ + model.primDimensions(split.index) + .getColName + } is high cardinality column") + } else { + isDictionaryLocked = dictLock.lockWithRetries() + if (isDictionaryLocked) { + logInfo(s"Successfully able to get the dictionary lock for ${ + model.primDimensions(split.index).getColName + }") + } else { + sys + .error(s"Dictionary file ${ + model.primDimensions(split.index).getColName + } is locked for updation. Please try after some time") + } + val t2 = System.currentTimeMillis + val fileType = FileFactory.getFileType(model.dictFilePaths(split.index)) + model.dictFileExists(split.index) = FileFactory + .isFileExist(model.dictFilePaths(split.index), fileType) + dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) { + CarbonLoaderUtil.getDictionary(model.table, + model.columnIdentifier(split.index), + model.hdfsLocation, + model.primDimensions(split.index).getDataType + ) + } else { + null + } + val dictCacheTime = System.currentTimeMillis - t2 + val t3 = System.currentTimeMillis() + val dictWriteTask = new DictionaryWriterTask(valuesBuffer, + dictionaryForDistinctValueLookUp, + model, + split.index) + // execute dictionary writer task to get distinct values + val distinctValues = dictWriteTask.execute() + val dictWriteTime = System.currentTimeMillis() - t3 + val t4 = System.currentTimeMillis() + // if new data came than rewrite sort index file + if (distinctValues.size() > 0) { + val sortIndexWriteTask = new SortIndexWriterTask(model, + split.index, + dictionaryForDistinctValueLookUp, + distinctValues) + sortIndexWriteTask.execute() + } + val sortIndexWriteTime = System.currentTimeMillis() - t4 + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime() + // After sortIndex writing, update dictionaryMeta + dictWriteTask.updateMetaData() + // clear the value buffer after writing dictionary data + valuesBuffer.clear + CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp) + dictionaryForDistinctValueLookUpCleared = true + LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" + + s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" + + s"\n new distinct values count: ${ distinctValues.size() }" + + s"\n combine lists: $combineListTime" + + s"\n create dictionary cache: $dictCacheTime" + + s"\n sort list, distinct and write: $dictWriteTime" + + s"\n write sort info: $sortIndexWriteTime") + } + } catch { + case ex: Exception => + LOGGER.error(ex) + throw ex + } finally { + if (!dictionaryForDistinctValueLookUpCleared) { + CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp) + } + CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting) + if (dictLock != null && isDictionaryLocked) { + if (dictLock.unlock()) { + logInfo(s"Dictionary ${ + model.primDimensions(split.index).getColName + } Unlocked Successfully.") + } else { + logError(s"Unable to unlock Dictionary ${ + model.primDimensions(split.index).getColName + }") + } + } + } + var finished = false + + override def hasNext: Boolean = { + + if (!finished) { + finished = true + finished + } else { + !finished + } + } + + override def next(): (Int, String, Boolean) = { + (split.index, status, isHighCardinalityColumn) + } + } + + iter + } + +} + +/** + * Set column dictionry patition format + * + * @param id partition id + * @param dimension current carbon dimension + */ +class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension) + extends Partition { + override val index: Int = id + val preDefDictDimension = dimension +} + + +/** + * Use external column dict to generate global dictionary + * + * @param carbonLoadModel carbon load model + * @param sparkContext spark context + * @param table carbon table identifier + * @param dimensions carbon dimenisons having predefined dict + * @param hdfsLocation carbon base store path + * @param dictFolderPath path of dictionary folder + */ +class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, + dictionaryLoadModel: DictionaryLoadModel, + sparkContext: SparkContext, + table: CarbonTableIdentifier, + dimensions: Array[CarbonDimension], + hdfsLocation: String, + dictFolderPath: String) + extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) { + + override def getPartitions: Array[Partition] = { + val primDimensions = dictionaryLoadModel.primDimensions + val primDimLength = primDimensions.length + val result = new Array[Partition](primDimLength) + for (i <- 0 until primDimLength) { + result(i) = new CarbonColumnDictPatition(i, primDimensions(i)) + } + result + } + + override def compute(split: Partition, context: TaskContext) + : Iterator[(Int, ColumnDistinctValues)] = { + val theSplit = split.asInstanceOf[CarbonColumnDictPatition] + val primDimension = theSplit.preDefDictDimension + // read the column dict data + val preDefDictFilePath = carbonLoadModel.getPredefDictFilePath(primDimension) + var csvReader: CSVReader = null + var inputStream: DataInputStream = null + var colDictData: java.util.Iterator[Array[String]] = null + try { + inputStream = FileFactory.getDataInputStream(preDefDictFilePath, + FileFactory.getFileType(preDefDictFilePath)) + csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset), + carbonLoadModel.getCsvDelimiter.charAt(0)) + // read the column data to list iterator + colDictData = csvReader.readAll.iterator + } catch { + case ex: Exception => + logError(s"Error in reading pre-defined " + + s"dictionary file:${ ex.getMessage }") + throw ex + } finally { + if (csvReader != null) { + try { + csvReader.close() + } catch { + case ex: Exception => + logError(s"Error in closing csvReader of " + + s"pre-defined dictionary file:${ ex.getMessage }") + } + } + if (inputStream != null) { + try { + inputStream.close() + } catch { + case ex: Exception => + logError(s"Error in closing inputStream of " + + s"pre-defined dictionary file:${ ex.getMessage }") + } + } + } + val mapIdWithSet = new HashMap[String, HashSet[String]] + val columnValues = new HashSet[String] + val distinctValues = (theSplit.index, columnValues) + mapIdWithSet.put(primDimension.getColumnId, columnValues) + // use parser to generate new dict value + val dimensionParser = GlobalDictionaryUtil.generateParserForDimension( + Some(primDimension), + GlobalDictionaryUtil.createDataFormat(carbonLoadModel.getDelimiters), + mapIdWithSet).get + // parse the column data + while (colDictData.hasNext) { + dimensionParser.parseString(colDictData.next()(0)) + } + Array((distinctValues._1, + ColumnDistinctValues(distinctValues._2.toArray, 0L))).iterator + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala new file mode 100644 index 0000000..c9e3b6b --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util +import java.util.{Collections, List} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.Job +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} +import org.apache.spark.sql.hive.DistributionUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo} +import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException} +import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.scan.result.iterator.RawResultIterator +import org.apache.carbondata.spark.MergeResult +import org.apache.carbondata.spark.load.CarbonLoaderUtil +import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger} +import org.apache.carbondata.spark.splits.TableSplit + +class CarbonMergerRDD[K, V]( + sc: SparkContext, + result: MergeResult[K, V], + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping, + confExecutorsTemp: String) + extends RDD[(K, V)](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + sc.setLocalProperty("spark.job.interruptOnCancel", "true") + + var storeLocation: String = null + val storePath = carbonMergerMapping.storePath + val metadataFilePath = carbonMergerMapping.metadataFilePath + val mergedLoadName = carbonMergerMapping.mergedLoadName + val databaseName = carbonMergerMapping.databaseName + val factTableName = carbonMergerMapping.factTableName + val tableId = carbonMergerMapping.tableId + + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val iter = new Iterator[(K, V)] { + + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + val tempLocationKey: String = CarbonCommonConstants + .COMPACTION_KEY_WORD + '_' + carbonLoadModel + .getDatabaseName + '_' + carbonLoadModel + .getTableName + '_' + carbonLoadModel.getTaskNo + + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + + val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + LOGGER.info(s"Temp storeLocation taken is $storeLocation") + var mergeStatus = false + var mergeNumber = "" + var exec: CarbonCompactionExecutor = null + try { + val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] + + // get destination segment properties as sent from driver which is of last segment. + + val segmentProperties = new SegmentProperties( + carbonMergerMapping.maxSegmentColumnSchemaList.asJava, + carbonMergerMapping.maxSegmentColCardinality) + + // sorting the table block info List. + val splitList = carbonSparkPartition.split.value.getAllSplits + val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList) + + Collections.sort(tableBlockInfoList) + + val segmentMapping: java.util.Map[String, TaskBlockInfo] = + CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList) + + val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] = + CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList) + + carbonLoadModel.setStorePath(storePath) + + exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName, + factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + dataFileMetadataSegMapping + ) + + // fire a query and get the results. + var result2: util.List[RawResultIterator] = null + try { + result2 = exec.processTableBlocks() + } catch { + case e: Throwable => + if (null != exec) { + exec.finish() + } + LOGGER.error(e) + if (null != e.getMessage) { + sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") + } else { + sys.error("Exception occurred in query execution.Please check logs.") + } + } + mergeNumber = mergedLoadName + .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + + CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length() + ) + + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, + factTableName, + carbonLoadModel.getTaskNo, + "0", + mergeNumber, + true + ) + + carbonLoadModel.setSegmentId(mergeNumber) + carbonLoadModel.setPartitionId("0") + val merger = + new RowResultMerger(result2, + databaseName, + factTableName, + segmentProperties, + tempStoreLoc, + carbonLoadModel, + carbonMergerMapping.maxSegmentColCardinality + ) + mergeStatus = merger.mergerSlice() + + } catch { + case e: Exception => + LOGGER.error(e) + throw e + } finally { + // delete temp location data + val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber + try { + val isCompactionFlow = true + CarbonLoaderUtil + .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow) + } catch { + case e: Exception => + LOGGER.error(e) + } + if (null != exec) { + exec.finish + } + } + + var finished = false + + override def hasNext: Boolean = { + if (!finished) { + finished = true + finished + } else { + !finished + } + } + + override def next(): (K, V) = { + finished = true + result.getKey(0, mergeStatus) + } + + } + iter + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + val theSplit = split.asInstanceOf[CarbonSparkPartition] + theSplit.split.value.getLocations.filter(_ != "localhost") + } + + override def getPartitions: Array[Partition] = { + val startTime = System.currentTimeMillis() + val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( + storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) + ) + val jobConf: JobConf = new JobConf(new Configuration) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + var defaultParallelism = sparkContext.defaultParallelism + val result = new util.ArrayList[Partition](defaultParallelism) + + // mapping of the node and block list. + var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new + util.HashMap[String, util.List[Distributable]] + + val noOfBlocks = 0 + var carbonInputSplits = mutable.Seq[CarbonInputSplit]() + + // for each valid segment. + for (eachSeg <- carbonMergerMapping.validSegments) { + + // map for keeping the relation of a task and its blocks. + job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) + + // get splits + val splits = format.getSplits(job) + carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) + } + + // prepare the details required to extract the segment properties using last segment. + if (null != carbonInputSplits && carbonInputSplits.nonEmpty) { + val carbonInputSplit = carbonInputSplits.last + var dataFileFooter: DataFileFooter = null + + try { + dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(), + carbonInputSplit.getStart, carbonInputSplit.getLength) + } catch { + case e: CarbonUtilException => + logError("Exception in preparing the data file footer for compaction " + e.getMessage) + throw e + } + + carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo + .getColumnCardinality + carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala + .toList + } + // send complete list of blocks to the mapping util. + nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping( + carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1) + + val confExecutors = confExecutorsTemp.toInt + val requiredExecutors = if (nodeBlockMapping.size > confExecutors) { + confExecutors + } else { nodeBlockMapping.size() } + DistributionUtil.ensureExecutors(sparkContext, requiredExecutors) + logInfo("No.of Executors required=" + requiredExecutors + + " , spark.executor.instances=" + confExecutors + + ", no.of.nodes where data present=" + nodeBlockMapping.size()) + var nodes = DistributionUtil.getNodeList(sparkContext) + var maxTimes = 30 + while (nodes.length < requiredExecutors && maxTimes > 0) { + Thread.sleep(500) + nodes = DistributionUtil.getNodeList(sparkContext) + maxTimes = maxTimes - 1 + } + logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis") + defaultParallelism = sparkContext.defaultParallelism + var i = 0 + + val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]() + + // Create Spark Partition for each task and assign blocks + nodeBlockMapping.asScala.foreach { case (nodeName, blockList) => + val taskBlockList = new util.ArrayList[NodeInfo](0) + nodeTaskBlocksMap.put(nodeName, taskBlockList) + var blockletCount = 0 + blockList.asScala.foreach { taskInfo => + val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit] + blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets + taskBlockList.add( + NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets)) + } + if (blockletCount != 0) { + val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier, + carbonInputSplits.asJava, nodeName) + result.add(new CarbonSparkPartition(id, i, multiBlockSplit)) + i += 1 + } + } + + // print the node info along with task and number of blocks for the task. + + nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => { + logInfo(s"for the node ${ entry._1 }") + for (elem <- entry._2.asScala) { + logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks) + } + }) + + val noOfNodes = nodes.length + val noOfTasks = result.size + logInfo(s"Identified no.of.Blocks: $noOfBlocks," + + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks") + logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime)) + for (j <- 0 until result.size ) { + val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value + val splitList = multiBlockSplit.getAllSplits + logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " + + s"${CarbonInputSplit.createBlocks(splitList).size}") + } + result.toArray(new Array[Partition](result.size)) + } + +} + +class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit) + extends Partition { + + override val index: Int = idx + val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala new file mode 100644 index 0000000..82a471f --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SerializableWritable} + +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit + +class CarbonSparkPartition( + val rddId: Int, + val idx: Int, + @transient val multiBlockSplit: CarbonMultiBlockSplit) + extends Partition { + + val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit) + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala new file mode 100644 index 0000000..fe805fe --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.lcm.status.SegmentStatusManager +import org.apache.carbondata.spark.MergeResultImpl +import org.apache.carbondata.spark.load.CarbonLoaderUtil +import org.apache.carbondata.spark.merger.CarbonDataMergerUtil + +/** + * Compactor class which handled the compaction cases. + */ +object Compactor { + + val logger = LogServiceFactory.getLogService(Compactor.getClass.getName) + + def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = { + + val storePath = compactionCallableModel.storePath + val storeLocation = compactionCallableModel.storeLocation + val carbonTable = compactionCallableModel.carbonTable + val kettleHomePath = compactionCallableModel.kettleHomePath + val cubeCreationTime = compactionCallableModel.cubeCreationTime + val loadsToMerge = compactionCallableModel.loadsToMerge + val sc = compactionCallableModel.sqlContext + val carbonLoadModel = compactionCallableModel.carbonLoadModel + val compactionType = compactionCallableModel.compactionType + + val startTime = System.nanoTime() + val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge) + var finalMergeStatus = false + val schemaName: String = carbonLoadModel.getDatabaseName + val factTableName = carbonLoadModel.getTableName + val validSegments: Array[String] = CarbonDataMergerUtil + .getValidSegments(loadsToMerge).split(',') + val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime() + val carbonMergerMapping = CarbonMergerMapping(storeLocation, + storePath, + carbonTable.getMetaDataFilepath, + mergedLoadName, + kettleHomePath, + cubeCreationTime, + schemaName, + factTableName, + validSegments, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + maxSegmentColCardinality = null, + maxSegmentColumnSchemaList = null + ) + carbonLoadModel.setStorePath(carbonMergerMapping.storePath) + carbonLoadModel.setLoadMetadataDetails( + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) + var execInstance = "1" + // in case of non dynamic executor allocation, number of executors are fixed. + if (sc.sparkContext.getConf.contains("spark.executor.instances")) { + execInstance = sc.sparkContext.getConf.get("spark.executor.instances") + logger.info(s"spark.executor.instances property is set to = $execInstance") + } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation. + else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) { + if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim + .equalsIgnoreCase("true")) { + execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") + logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance") + } + } + + val mergeStatus = new CarbonMergerRDD( + sc.sparkContext, + new MergeResultImpl(), + carbonLoadModel, + carbonMergerMapping, + execInstance + ).collect + + if (mergeStatus.length == 0) { + finalMergeStatus = false + } else { + finalMergeStatus = mergeStatus.forall(_._2) + } + + if (finalMergeStatus) { + val endTime = System.nanoTime() + logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }") + if (!CarbonDataMergerUtil + .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, + mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType + )) { + logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + throw new Exception(s"Compaction failed to update metadata for table" + + s" ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + } else { + logger.audit(s"Compaction request completed for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " + + s".${ carbonLoadModel.getTableName }") + } + } else { + logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " + + s".${ carbonLoadModel.getTableName }" + ) + logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " + + s".${ carbonLoadModel.getTableName }") + throw new Exception("Compaction Failure in Merger Rdd.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala new file mode 100644 index 0000000..7395e43 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark._ + +case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition) + +class DataLoadCoalescedRDD[T: ClassTag]( + @transient var prev: RDD[T], + nodeList: Array[String]) + extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) { + + override def getPartitions: Array[Partition] = { + new DataLoadPartitionCoalescer(prev, nodeList).run + } + + override def compute(split: Partition, + context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = { + + new Iterator[DataLoadPartitionWrap[T]] { + val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator + def hasNext = iter.hasNext + def next: DataLoadPartitionWrap[T] = { + DataLoadPartitionWrap(firstParent[T], iter.next()) + } + } + } + + override def getDependencies: Seq[Dependency[_]] = { + Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices + }) + } + + override def clearDependencies() { + super.clearDependencies() + prev = null + } + + /** + * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition, + * then the preferred machine will be one which most parent splits prefer too. + * @param partition + * @return the machine most preferred by split + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { + partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq + } +}