http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala deleted file mode 100644 index 6eeeaf9..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ /dev/null @@ -1,1088 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import java.text.SimpleDateFormat -import java.util -import java.util.concurrent._ - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer -import scala.util.Random -import scala.util.control.Breaks._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkEnv, SparkException, TaskContext} -import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} -import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} -import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} -import org.apache.spark.sql.hive.DistributionUtil -import org.apache.spark.util.SparkUtil - -import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} -import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion} -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} -import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.scan.partition.PartitionUtil -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties} -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.processing.exception.DataLoadingException -import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} -import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} -import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.loading.sort.SortScopeOptions -import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.processing.splits.TableSplit -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil} -import org.apache.carbondata.spark._ -import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} - -/** - * This is the factory class which can create different RDD depends on user needs. - * - */ -object CarbonDataRDDFactory { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def alterTableForCompaction(sqlContext: SQLContext, - alterTableModel: AlterTableModel, - carbonLoadModel: CarbonLoadModel, - storePath: String, - storeLocation: String): Unit = { - var compactionSize: Long = 0 - var compactionType: CompactionType = CompactionType.MINOR_COMPACTION - if (alterTableModel.compactionType.equalsIgnoreCase("major")) { - compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION) - compactionType = CompactionType.MAJOR_COMPACTION - } else if (alterTableModel.compactionType - .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) { - compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION - if (alterTableModel.segmentUpdateStatusManager.get != None) { - carbonLoadModel - .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get)) - carbonLoadModel - .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get - .getLoadMetadataDetails.toList.asJava) - } - } - else { - compactionType = CompactionType.MINOR_COMPACTION - } - - LOGGER.audit(s"Compaction request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - - if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - } - // reading the start time of data load. - val loadStartTime = CarbonUpdateUtil.readCurrentTime(); - carbonLoadModel.setFactTimeStamp(loadStartTime) - - val isCompactionTriggerByDDl = true - val compactionModel = CompactionModel(compactionSize, - compactionType, - carbonTable, - isCompactionTriggerByDDl - ) - - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, - CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION - ) - .equalsIgnoreCase("true") - - // if system level compaction is enabled then only one compaction can run in the system - // if any other request comes at this time then it will create a compaction request file. - // so that this will be taken up by the compaction process which is executing. - if (!isConcurrentCompactionAllowed) { - LOGGER.info("System level compaction lock is enabled.") - handleCompactionForSystemLocking(sqlContext, - carbonLoadModel, - storePath, - storeLocation, - compactionType, - carbonTable, - compactionModel - ) - } else { - // normal flow of compaction - val lock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.COMPACTION_LOCK - ) - - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - try { - startCompactionThreads(sqlContext, - carbonLoadModel, - storePath, - storeLocation, - compactionModel, - lock - ) - } catch { - case e: Exception => - LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") - lock.unlock() - } - } else { - LOGGER.audit("Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - LOGGER.error(s"Not able to acquire the compaction lock for table" + - s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - sys.error("Table is already locked for compaction. Please try after some time.") - } - } - } - - def handleCompactionForSystemLocking(sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - storePath: String, - storeLocation: String, - compactionType: CompactionType, - carbonTable: CarbonTable, - compactionModel: CompactionModel): Unit = { - val lock = CarbonLockFactory - .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER, - LockUsage.SYSTEMLEVEL_COMPACTION_LOCK - ) - if (lock.lockWithRetries()) { - LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" + - s".${ carbonLoadModel.getTableName }") - try { - startCompactionThreads(sqlContext, - carbonLoadModel, - storePath, - storeLocation, - compactionModel, - lock - ) - } catch { - case e: Exception => - LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") - lock.unlock() - // if the compaction is a blocking call then only need to throw the exception. - if (compactionModel.isDDLTrigger) { - throw e - } - } - } else { - LOGGER.audit("Not able to acquire the system level compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - LOGGER.error("Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - CarbonCompactionUtil - .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType) - // do sys error only in case of DDL trigger. - if (compactionModel.isDDLTrigger) { - sys.error("Compaction is in progress, compaction request for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + - " is in queue.") - } else { - LOGGER.error("Compaction is in progress, compaction request for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + - " is in queue.") - } - } - } - - def startCompactionThreads(sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - storePath: String, - storeLocation: String, - compactionModel: CompactionModel, - compactionLock: ICarbonLock): Unit = { - val executor: ExecutorService = Executors.newFixedThreadPool(1) - // update the updated table status. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - val compactionThread = new Thread { - override def run(): Unit = { - - try { - // compaction status of the table which is triggered by the user. - var triggeredCompactionStatus = false - var exception: Exception = null - try { - DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel, - compactionModel: CompactionModel, - executor, sqlContext, storeLocation - ) - triggeredCompactionStatus = true - } catch { - case e: Exception => - LOGGER.error(s"Exception in compaction thread ${ e.getMessage }") - exception = e - } - // continue in case of exception also, check for all the tables. - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, - CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION - ).equalsIgnoreCase("true") - - if (!isConcurrentCompactionAllowed) { - LOGGER.info("System level compaction lock is enabled.") - val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() - var table: CarbonTable = CarbonCompactionUtil - .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata. - tablesMeta.map(_.carbonTable).toArray, - skipCompactionTables.toList.asJava) - while (null != table) { - LOGGER.info("Compaction request has been identified for table " + - s"${ table.getDatabaseName }." + - s"${ table.getFactTableName }") - val metadataPath = table.getMetaDataFilepath - val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) - - val newCarbonLoadModel = new CarbonLoadModel() - DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel) - - val compactionSize = CarbonDataMergerUtil - .getCompactionSize(CompactionType.MAJOR_COMPACTION) - - val newcompactionModel = CompactionModel(compactionSize, - compactionType, - table, - compactionModel.isDDLTrigger - ) - // proceed for compaction - try { - DataManagementFunc.executeCompaction(newCarbonLoadModel, - newcompactionModel, - executor, sqlContext, storeLocation - ) - } catch { - case e: Exception => - LOGGER.error("Exception in compaction thread for table " + - s"${ table.getDatabaseName }." + - s"${ table.getFactTableName }") - // not handling the exception. only logging as this is not the table triggered - // by user. - } finally { - // delete the compaction required file in case of failure or success also. - if (!CarbonCompactionUtil - .deleteCompactionRequiredFile(metadataPath, compactionType)) { - // if the compaction request file is not been able to delete then - // add those tables details to the skip list so that it wont be considered next. - skipCompactionTables.+=:(table.getCarbonTableIdentifier) - LOGGER.error("Compaction request file can not be deleted for table " + - s"${ table.getDatabaseName }." + - s"${ table.getFactTableName }") - } - } - // ********* check again for all the tables. - table = CarbonCompactionUtil - .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata - .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava - ) - } - // giving the user his error for telling in the beeline if his triggered table - // compaction is failed. - if (!triggeredCompactionStatus) { - throw new Exception("Exception in compaction " + exception.getMessage) - } - } - } finally { - executor.shutdownNow() - DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel) - compactionLock.unlock() - } - } - } - // calling the run method of a thread to make the call as blocking call. - // in the future we may make this as concurrent. - compactionThread.run() - } - - def loadCarbonData(sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - storePath: String, - columnar: Boolean, - partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS, - result: Option[DictionaryServer], - overwriteTable: Boolean, - dataFrame: Option[DataFrame] = None, - updateModel: Option[UpdateTableModel] = None): Unit = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val isAgg = false - // for handling of the segment Merging. - def handleSegmentMerging(): Unit = { - LOGGER.info(s"compaction need status is" + - s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") - if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) { - LOGGER.audit(s"Compaction request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - val compactionSize = 0 - val isCompactionTriggerByDDl = false - val compactionModel = CompactionModel(compactionSize, - CompactionType.MINOR_COMPACTION, - carbonTable, - isCompactionTriggerByDDl - ) - var storeLocation = "" - val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != configuredStore && configuredStore.nonEmpty) { - storeLocation = configuredStore(Random.nextInt(configuredStore.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, - CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION - ) - .equalsIgnoreCase("true") - - if (!isConcurrentCompactionAllowed) { - - handleCompactionForSystemLocking(sqlContext, - carbonLoadModel, - storePath, - storeLocation, - CompactionType.MINOR_COMPACTION, - carbonTable, - compactionModel - ) - } else { - val lock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.COMPACTION_LOCK - ) - - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock.") - try { - startCompactionThreads(sqlContext, - carbonLoadModel, - storePath, - storeLocation, - compactionModel, - lock - ) - } catch { - case e: Exception => - LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") - lock.unlock() - throw e - } - } else { - LOGGER.audit("Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ - carbonLoadModel - .getTableName - }") - LOGGER.error("Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ - carbonLoadModel - .getTableName - }") - } - } - } - } - - def updateStatus(loadStatus: String, - stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = { - val metadataDetails = if (stat != null && stat(0) != null) { - stat(0)._2._1 - } else { - new LoadMetadataDetails - } - CarbonLoaderUtil - .populateNewLoadMetaEntry(metadataDetails, - loadStatus, - carbonLoadModel.getFactTimeStamp, - true) - val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, - carbonLoadModel, false, overwriteTable) - if (!status) { - val errorMessage = "Dataload failed due to failure in table status updation." - LOGGER.audit("Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ - carbonLoadModel - .getTableName - }") - LOGGER.error("Dataload failed due to failure in table status updation.") - throw new Exception(errorMessage) - } - } - - try { - LOGGER.audit(s"Data load request has been received for table" + - s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - // Check if any load need to be deleted before loading new data - DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, storePath, false, carbonTable) - // get partition way from configuration - // val isTableSplitPartition = CarbonProperties.getInstance().getProperty( - // CarbonCommonConstants.TABLE_SPLIT_PARTITION, - // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean - val isTableSplitPartition = false - var blocksGroupBy: Array[(String, Array[BlockDetails])] = null - var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null - var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null - - def loadDataFile(): Unit = { - if (isTableSplitPartition) { - /* - * when data handle by table split partition - * 1) get partition files, direct load or not will get the different files path - * 2) get files blocks by using SplitUtils - * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy - */ - var splits = Array[TableSplit]() - if (carbonLoadModel.isDirectLoad) { - // get all table Splits, this part means files were divide to different partitions - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) - // get all partition blocks from file list - blocksGroupBy = splits.map { - split => - val pathBuilder = new StringBuilder() - for (path <- split.getPartition.getFilesPath.asScala) { - pathBuilder.append(path).append(",") - } - if (pathBuilder.nonEmpty) { - pathBuilder.substring(0, pathBuilder.size - 1) - } - (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(), - sqlContext.sparkContext - )) - } - } else { - // get all table Splits,when come to this, means data have been partition - splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null) - // get all partition blocks from factFilePath/uniqueID/ - blocksGroupBy = splits.map { - split => - val pathBuilder = new StringBuilder() - pathBuilder.append(carbonLoadModel.getFactFilePath) - if (!carbonLoadModel.getFactFilePath.endsWith("/") - && !carbonLoadModel.getFactFilePath.endsWith("\\")) { - pathBuilder.append("/") - } - pathBuilder.append(split.getPartition.getUniqueID).append("/") - (split.getPartition.getUniqueID, - SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext)) - } - } - } else { - /* - * when data load handle by node partition - * 1)clone the hadoop configuration,and set the file path to the configuration - * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info - * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block, - * for locally writing carbondata files(one file one block) in nodes - * use NewCarbonDataLoadRDD to load data and write to carbondata files - */ - val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - // FileUtils will skip file which is no csv, and return all file path which split by ',' - val filePaths = carbonLoadModel.getFactFilePath - hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths) - hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true") - hadoopConfiguration.set("io.compression.codecs", - """org.apache.hadoop.io.compress.GzipCodec, - org.apache.hadoop.io.compress.DefaultCodec, - org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) - - CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) - - val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat - val jobContext = new Job(hadoopConfiguration) - val rawSplits = inputFormat.getSplits(jobContext).toArray - val blockList = rawSplits.map { inputSplit => - val fileSplit = inputSplit.asInstanceOf[FileSplit] - new TableBlockInfo(fileSplit.getPath.toString, - fileSplit.getStart, "1", - fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null - ).asInstanceOf[Distributable] - } - // group blocks to nodes, tasks - val startTime = System.currentTimeMillis - val activeNodes = DistributionUtil - .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) - val nodeBlockMapping = - CarbonLoaderUtil - .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala - .toSeq - val timeElapsed: Long = System.currentTimeMillis - startTime - LOGGER.info("Total Time taken in block allocation: " + timeElapsed) - LOGGER.info(s"Total no of blocks: ${ blockList.length }, " + - s"No.of Nodes: ${nodeBlockMapping.size}") - var str = "" - nodeBlockMapping.foreach(entry => { - val tableBlock = entry._2 - str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size() - tableBlock.asScala.foreach(tableBlockInfo => - if (!tableBlockInfo.getLocations.exists(hostentry => - hostentry.equalsIgnoreCase(entry._1) - )) { - str = str + " , mismatch locations: " + tableBlockInfo.getLocations - .foldLeft("")((a, b) => a + "," + b) - } - ) - str = str + "\n" - } - ) - LOGGER.info(str) - blocksGroupBy = nodeBlockMapping.map(entry => { - val blockDetailsList = - entry._2.asScala.map(distributable => { - val tableBlock = distributable.asInstanceOf[TableBlockInfo] - new BlockDetails(new Path(tableBlock.getFilePath), - tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations - ) - }).toArray - (entry._1, blockDetailsList) - } - ).toArray - } - - status = new NewCarbonDataLoadRDD(sqlContext.sparkContext, - new DataLoadResultImpl(), - carbonLoadModel, - blocksGroupBy, - isTableSplitPartition).collect() - } - - def loadDataFrame(): Unit = { - try { - val rdd = dataFrame.get.rdd - val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => - DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) - }.distinct.size - val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, - sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) - var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length - numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length)) - val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false) - - status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, - new DataLoadResultImpl(), - carbonLoadModel, - newRdd).collect() - } catch { - case ex: Exception => - LOGGER.error(ex, "load data frame failed") - throw ex - } - } - - def loadDataFrameForUpdate(): Unit = { - val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate - - def triggerDataLoadForSegment(key: String, taskNo: Int, - iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { - val rddResult = new updateResultImpl() - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { - var partitionID = "0" - val loadMetadataDetails = new LoadMetadataDetails - val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") - var uniqueLoadStatusId = "" - try { - val segId = key - val index = taskNo - uniqueLoadStatusId = carbonLoadModel.getTableName + - CarbonCommonConstants.UNDERSCORE + - (index + "_0") - - // convert timestamp - val timeStampInLong = updateModel.get.updatedTimeStamp + "" - loadMetadataDetails.setPartitionCount(partitionID) - loadMetadataDetails.setLoadName(segId) - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) - carbonLoadModel.setPartitionId(partitionID) - carbonLoadModel.setSegmentId(segId) - carbonLoadModel.setTaskNo(String.valueOf(index)) - carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) - - // During Block Spill case Increment of File Count and proper adjustment of Block - // naming is only done when AbstractFactDataWriter.java : initializeWriter get - // CarbondataFileName as null. For handling Block Spill not setting the - // CarbondataFileName in case of Update. - // carbonLoadModel.setCarbondataFileName(newBlockName) - - // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index) - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - UpdateDataLoad.DataLoadForUpdate(segId, - index, - iter, - carbonLoadModel, - loadMetadataDetails) - } catch { - case e: NoRetryException => - loadMetadataDetails - .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) - executionErrors.failureCauses = FailureCauses.BAD_RECORDS - executionErrors.errorMsg = e.getMessage - LOGGER.info("Bad Record Found") - case e: Exception => - LOGGER.info("DataLoad failure") - LOGGER.error(e) - throw e - } - - var finished = false - - override def hasNext: Boolean = !finished - - override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = { - finished = true - rddResult - .getKey(uniqueLoadStatusId, - (loadMetadataDetails, executionErrors)) - } - } - resultIter - } - - val updateRdd = dataFrame.get.rdd - - // return directly if no rows to update - val noRowsToUpdate = updateRdd.isEmpty() - if (noRowsToUpdate) { - res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() - return - } - - // splitting as (key, value) i.e., (segment, updatedRows) - val keyRDD = updateRdd.map(row => - (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) - - val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath) - val segmentIds = loadMetadataDetails.map(_.getLoadName) - val segmentIdIndex = segmentIds.zipWithIndex.toMap - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, - carbonTable.getCarbonTableIdentifier) - val segmentId2maxTaskNo = segmentIds.map { segId => - (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) - }.toMap - - class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) - extends org.apache.spark.Partitioner { - override def numPartitions: Int = segmentIdIndex.size * parallelism - - override def getPartition(key: Any): Int = { - val segId = key.asInstanceOf[String] - // partitionId - segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) - } - } - - val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex, - segmentUpdateParallelism)) - - // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, - // so segmentIdIndex=partitionId/parallelism, this has been verified. - res = partitionByRdd.map(_._2).mapPartitions { partition => - val partitionId = TaskContext.getPartitionId() - val segIdIndex = partitionId / segmentUpdateParallelism - val randomPart = partitionId - segIdIndex * segmentUpdateParallelism - val segId = segmentIds(segIdIndex) - val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1 - - List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator - }.collect() - } - - def loadDataForPartitionTable(): Unit = { - try { - val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel) - status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext, - new DataLoadResultImpl(), - carbonLoadModel, - rdd).collect() - } catch { - case ex: Exception => - LOGGER.error(ex, "load data failed for partition table") - throw ex - } - } - - // create new segment folder in carbon store - if (!updateModel.isDefined) { - CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath, - carbonLoadModel.getSegmentId, carbonTable) - } - var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - var errorMessage: String = "DataLoad failure" - var executorMessage: String = "" - val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel) - val sortScope = CarbonDataProcessorUtil.getSortScope(configuration) - try { - if (updateModel.isDefined) { - loadDataFrameForUpdate() - } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { - loadDataForPartitionTable() - } else if (configuration.isSortTable && - sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - LOGGER.audit("Using global sort for loading.") - status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext, - dataFrame, carbonLoadModel) - } else if (dataFrame.isDefined) { - loadDataFrame() - } else { - loadDataFile() - } - if (updateModel.isDefined) { - - res.foreach(resultOfSeg => resultOfSeg.foreach( - resultOfBlock => { - if (resultOfBlock._2._1.getLoadStatus - .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } - else { - updateModel.get.executorErrors = resultOfBlock._2._2 - } - } else if (resultOfBlock._2._1.getLoadStatus - .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg - } - } - )) - - } - else { - val newStatusMap = scala.collection.mutable.Map.empty[String, String] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) - case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getLoadStatus == - CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) - } - } - - newStatusMap.foreach { - case (key, value) => - if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && - !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS - } - } - } else { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - } - - if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE && - partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) { - loadStatus = partitionStatus - } - } - } catch { - case ex: Throwable => - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - ex match { - case sparkException: SparkException => - if (sparkException.getCause.isInstanceOf[DataLoadingException] || - sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { - executorMessage = sparkException.getCause.getMessage - errorMessage = errorMessage + ": " + executorMessage - } - case _ => - executorMessage = ex.getCause.getMessage - errorMessage = errorMessage + ": " + executorMessage - } - LOGGER.info(errorMessage) - LOGGER.error(ex) - } - // handle the status file updation for the update cmd. - if (updateModel.isDefined) { - - if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { - // updateModel.get.executorErrors.errorMsg = errorMessage - if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - if (null != executorMessage && !executorMessage.isEmpty) { - updateModel.get.executorErrors.errorMsg = executorMessage - } else { - updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed." - } - } - return - } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && - updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS && - carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { - return - } else { - // in success case handle updation of the table status file. - // success case. - val segmentDetails = new util.HashSet[String]() - - var resultSize = 0 - - res.foreach(resultOfSeg => { - resultSize = resultSize + resultOfSeg.size - resultOfSeg.foreach( - resultOfBlock => { - segmentDetails.add(resultOfBlock._2._1.getLoadName) - } - )} - ) - - // this means that the update doesnt have any records to update so no need to do table - // status file updation. - if (resultSize == 0) { - LOGGER.audit("Data update is successful with 0 rows updation for " + - s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") - return - } - - if ( - CarbonUpdateUtil - .updateTableMetadataStatus(segmentDetails, - carbonTable, - updateModel.get.updatedTimeStamp + "", - true, - new util.ArrayList[String](0))) { - LOGGER.audit("Data update is successful for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - } - else { - val errorMessage = "Data update failed due to failure in table status updation." - LOGGER.audit("Data update is failed for " + - s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") - LOGGER.error("Data update failed due to failure in table status updation.") - updateModel.get.executorErrors.errorMsg = errorMessage - updateModel.get.executorErrors.failureCauses = FailureCauses - .STATUS_FILE_UPDATION_FAILURE - return - } - - } - - return - } - if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { - LOGGER.info("********starting clean up**********") - CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) - LOGGER.info("********clean up done**********") - LOGGER.audit(s"Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - LOGGER.warn("Cannot write load metadata file as data load failed") - updateStatus(loadStatus, status) - throw new Exception(errorMessage) - } else { - // check if data load fails due to bad record and throw data load failure due to - // bad record exception - if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && - status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS && - carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { - LOGGER.info("********starting clean up**********") - CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) - LOGGER.info("********clean up done**********") - LOGGER.audit(s"Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status) - throw new Exception(status(0)._2._2.errorMsg) - } - if (!isAgg) { - writeDictionary(carbonLoadModel, result) - updateStatus(loadStatus, status) - } else if (!carbonLoadModel.isRetentionRequest) { - // TODO : Handle it - LOGGER.info("********Database updated**********") - } - - if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) { - LOGGER.audit("Data load is partially successful for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - } else { - LOGGER.audit("Data load is successful for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - } - try { - // compaction handling - handleSegmentMerging() - } catch { - case e: Exception => - throw new Exception( - "Dataload is success. Auto-Compaction has failed. Please check logs.") - } - } - } - - } - - /** - * repartition the input data for partiton table. - * @param sqlContext - * @param dataFrame - * @param carbonLoadModel - * @return - */ - private def repartitionInputData(sqlContext: SQLContext, - dataFrame: Option[DataFrame], - carbonLoadModel: CarbonLoadModel): RDD[Row] = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) - val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName - val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType - val columns = carbonLoadModel.getCsvHeaderColumns - var partitionColumnIndex = -1 - breakable { - for (i <- 0 until columns.length) { - if (partitionColumn.equalsIgnoreCase(columns(i))) { - partitionColumnIndex = i - break - } - } - } - if (partitionColumnIndex == -1) { - throw new DataLoadingException("Partition column not found.") - } - - val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat()) - val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase)) - val timeStampFormat = if (specificFormat.isDefined) { - new SimpleDateFormat(specificFormat.get) - } else { - val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - new SimpleDateFormat(timestampFormatString) - } - - val dateFormat = if (specificFormat.isDefined) { - new SimpleDateFormat(specificFormat.get) - } else { - val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - new SimpleDateFormat(dateFormatString) - } - - // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions - val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) { - // input data from DataFrame - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - dataFrame.get.rdd.map { row => - if (null != row && row.length > partitionColumnIndex && - null != row.get(partitionColumnIndex)) { - (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat, - delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row) - } else { - (null, row) - } - } - } else { - // input data from csv files - val hadoopConfiguration = new Configuration() - CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel) - hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) - val columnCount = columns.length - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sqlContext.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - hadoopConfiguration - ).map { currentRow => - if (null == currentRow || null == currentRow._2) { - val row = new StringArrayRow(new Array[String](columnCount)) - (null, row) - } else { - val row = new StringArrayRow(new Array[String](columnCount)) - val values = currentRow._2.get() - if (values != null && values.length > partitionColumnIndex) { - (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get())) - } else { - (null, row.setValues(currentRow._2.get())) - } - } - } - } - - val partitioner = PartitionFactory.getPartitioner(partitionInfo) - if (partitionColumnDataType == DataTypes.STRING) { - if (partitionInfo.getPartitionType == PartitionType.RANGE) { - inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) } - .partitionBy(partitioner) - .map(_._2) - } else { - inputRDD.partitionBy(partitioner) - .map(_._2) - } - } else { - inputRDD.map { row => - (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat, - dateFormat), row._2) - } - .partitionBy(partitioner) - .map(_._2) - } - } - - private def writeDictionary(carbonLoadModel: CarbonLoadModel, - result: Option[DictionaryServer]) = { - // write dictionary file and shutdown dictionary server - val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${ - carbonLoadModel.getTableName }" - result match { - case Some(server) => - try { - server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - .getCarbonTableIdentifier.getTableId) - } catch { - case ex: Exception => - LOGGER.error(s"Error while writing dictionary file for $uniqueTableName") - throw new Exception("Dataload failed due to error while writing dictionary file!") - } - case _ => - } - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala deleted file mode 100644 index f8275d1..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.thriftserver - -import java.io.File - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.CarbonContext -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties - -object CarbonThriftServer { - - def main(args: Array[String]): Unit = { - val conf = new SparkConf() - .setAppName("Carbon Thrift Server") - if (!conf.contains("carbon.properties.filepath")) { - val sparkHome = System.getenv.get("SPARK_HOME") - if (sparkHome != null) { - val file = new File(sparkHome + '/' + "conf" + '/' + "carbon.properties") - if (file.exists()) { - conf.set("carbon.properties.filepath", file.getCanonicalPath) - System.setProperty("carbon.properties.filepath", file.getCanonicalPath) - } - } - } else { - System.setProperty("carbon.properties.filepath", conf.get("carbon.properties.filepath")) - } - if (org.apache.spark.SPARK_VERSION.startsWith("1.6")) { - conf.set("spark.sql.hive.thriftServer.singleSession", "true") - } - val sc = new SparkContext(conf) - val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000") - try { - Thread.sleep(Integer.parseInt(warmUpTime)) - } catch { - case e: Exception => - val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " + - "Using default Value and proceeding") - Thread.sleep(30000) - } - val storePath = if (args.length > 0) args.head else null - val cc = new CarbonContext(sc, storePath) - - HiveThriftServer2.startWithContext(cc) - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala deleted file mode 100644 index 118249a..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap} - -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil - -case class TransformHolder(rdd: Any, mataData: CarbonMetaData) - -object CarbonSparkUtil { - - def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = { - val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName) - .asScala.map(x => x.getColName) // wf : may be problem - val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName) - .asScala.map(x => x.getColName) - val dictionary = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f => - (f.getColName.toLowerCase, - f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !f.getDataType.isComplexType) - } - CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap)) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala deleted file mode 100644 index 4950227..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import scala.reflect.ClassTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat - - -/** - * All the utility functions for carbon plan creation - */ -object QueryPlanUtil { - - /** - * createCarbonInputFormat from query model - */ - def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : - (CarbonTableInputFormat[Array[Object]], Job) = { - val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() - val jobConf: JobConf = new JobConf(new Configuration) - val job: Job = new Job(jobConf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) - (carbonInputFormat, job) - } - - def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier, - conf: Configuration) : CarbonTableInputFormat[V] = { - val carbonInputFormat = new CarbonTableInputFormat[V]() - val job: Job = new Job(conf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) - carbonInputFormat - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala deleted file mode 100644 index ea75ccb..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark - -import java.lang.Long - -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.{DataReadMethod, InputMetrics} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, InputMetricsStats} -import org.apache.carbondata.spark.InitInputMetrics - -/** - * It gives statistics of number of bytes and record read - */ -class CarbonInputMetrics extends InitInputMetrics { - @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - var inputMetrics: InputMetrics = _ - var bytesReadCallback: Option[() => scala.Long] = _ - var carbonMultiBlockSplit: CarbonMultiBlockSplit = _ - - def initBytesReadCallback(context: TaskContext, - carbonMultiBlockSplit: CarbonMultiBlockSplit) { - inputMetrics = context.taskMetrics().getInputMetricsForReadMethod(DataReadMethod.Hadoop) - this.carbonMultiBlockSplit = carbonMultiBlockSplit; - bytesReadCallback = carbonMultiBlockSplit match { - case _: CarbonMultiBlockSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - case _ => None - } - } - - def incrementRecordRead(recordRead: Long) { - inputMetrics.incRecordsRead(recordRead) - } - - def updateAndClose() { - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() - } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) { - // If we can't get the bytes read from the FS stats, fall back to the split size, - // which may be inaccurate. - try { - inputMetrics.incBytesRead(carbonMultiBlockSplit.getLength) - } catch { - case e: java.io.IOException => - LOGGER.warn("Unable to get input size to set InputMetrics for task:" + e.getMessage) - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala deleted file mode 100644 index a6c28a9..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.DataType - -import org.apache.carbondata.core.scan.expression.ColumnExpression - -case class CastExpr(expr: Expression) extends Filter - -case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean) - extends LeafExpression with NamedExpression with CodegenFallback { - - type EvaluatedType = Any - - override def toString: String = s"input[" + colExp.getColIndex + "]" - - override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType) - - override def name: String = colExp.getColumnName - - override def toAttribute: Attribute = throw new UnsupportedOperationException - - override def exprId: ExprId = throw new UnsupportedOperationException - - override def qualifiers: Seq[String] = throw new UnsupportedOperationException -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala deleted file mode 100644 index 024c54b..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{ UnaryNode, _ } -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.optimizer.CarbonDecoderRelation -import org.apache.spark.sql.types._ - -import org.apache.carbondata.spark.CarbonAliasDecoderRelation -import org.apache.carbondata.spark.util.CommonUtil - -/** - * Top command - */ -case class Top(count: Int, topOrBottom: Int, dim: NamedExpression, msr: NamedExpression, - child: LogicalPlan) extends UnaryNode { - def output: Seq[Attribute] = child.output - - override def references: AttributeSet = { - val list = List(dim, msr) - AttributeSet(list.flatMap(_.references)) - } -} - -object getDB { - - def getDatabaseName(dbName: Option[String], sqlContext: SQLContext): String = { - dbName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase) - } - -} - -/** - * Shows Loads in a table - */ -case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String]) - extends LogicalPlan with Command { - - override def children: Seq[LogicalPlan] = Seq.empty - - override def output: Seq[Attribute] = { - Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(), - AttributeReference("Status", StringType, nullable = false)(), - AttributeReference("Load Start Time", TimestampType, nullable = false)(), - AttributeReference("Load End Time", TimestampType, nullable = false)()) - } -} - -/** - * Describe formatted for hive table - */ -case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) - extends LogicalPlan with Command { - override def children: Seq[LogicalPlan] = Seq.empty - - override def output: Seq[AttributeReference] = - Seq(AttributeReference("result", StringType, nullable = false)()) -} - -case class CarbonDictionaryCatalystDecoder( - relations: Seq[CarbonDecoderRelation], - profile: CarbonProfile, - aliasMap: CarbonAliasDecoderRelation, - isOuter: Boolean, - child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable { - def isEmpty: Boolean = attributes.isEmpty -} - -case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes) - -case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes) - -case class CreateDatabase(dbName: String, sql: String) extends LogicalPlan with Command { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = { - Seq() - } -} - -case class DropDatabase(dbName: String, isCascade: Boolean, sql: String) - extends LogicalPlan with Command { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = { - Seq() - } -} - -case class UseDatabase(sql: String) extends LogicalPlan with Command { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = { - Seq() - } -} - -case class ProjectForUpdate( - table: UnresolvedRelation, - columns: List[String], - children: Seq[LogicalPlan] ) extends LogicalPlan with Command { - override def output: Seq[AttributeReference] = Seq.empty -} - -case class UpdateTable( - table: UnresolvedRelation, - columns: List[String], - selectStmt: String, - filer: String) extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = Seq.empty -} - -case class DeleteRecords( - statement: String, - table: UnresolvedRelation) extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = Seq.empty -} - -case class ShowPartitions( - table: TableIdentifier) extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[Attribute] = CommonUtil.partitionInfoOutput -} - -/** - * A logical plan representing insertion into Hive table. - * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable - * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types. - */ -case class InsertIntoCarbonTable( - table: CarbonDatasourceRelation, - partition: Map[String, Option[String]], - child: LogicalPlan, - overwrite: Boolean, - ifNotExists: Boolean) - extends LogicalPlan with Command { - - override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty - - // This is the expected schema of the table prepared to be inserted into, - // including dynamic partition columns. - val tableOutput = table.carbonRelation.output -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala deleted file mode 100644 index da4b210..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.io.File - -import scala.language.implicitConversions - -import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.ParserDialect -import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.ExtractPythonUDFs -import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, PreWriteCheck} -import org.apache.spark.sql.hive._ -import org.apache.spark.sql.optimizer.CarbonOptimizer - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} - -class CarbonContext( - val sc: SparkContext, - val storePath: String, - metaStorePath: String) extends HiveContext(sc) { - self => - - def this(sc: SparkContext) = { - this(sc, - null, - new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) - } - - def this(sc: SparkContext, storePath: String) = { - this(sc, - storePath, - new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) - } - - CarbonContext.addInstance(sc, this) - CodeGenerateFactory.init(sc.version) - udf.register("getTupleId", () => "") - CarbonEnv.init(this) - - var lastSchemaUpdatedTime = System.currentTimeMillis() - val hiveClientInterface = metadataHive - - protected[sql] override lazy val conf: SQLConf = new CarbonSQLConf - - @transient - override lazy val catalog = { - val carbonProperties = CarbonProperties.getInstance() - if (storePath != null) { - carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) - // In case if it is in carbon.properties for backward compatible - } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { - carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, - conf.getConfString("spark.sql.warehouse.dir")) - } - new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog - } - - @transient - override protected[sql] lazy val analyzer = - new Analyzer(catalog, functionRegistry, conf) { - - override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.CreateTables :: - CarbonIUDAnalysisRule :: - CarbonPreInsertionCasts :: - ExtractPythonUDFs :: - ResolveHiveWindowFunction :: - PreInsertCastAndRename :: - Nil - - override val extendedCheckRules = Seq( - PreWriteCheck(catalog) - ) - } - - @transient - override protected[sql] lazy val optimizer: Optimizer = - CarbonOptimizer.optimizer( - CodeGenerateFactory.createDefaultOptimizer(conf, sc), - conf.asInstanceOf[CarbonSQLConf], - sc.version) - - protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this) - - experimental.extraStrategies = { - val carbonStrategy = new CarbonStrategies(self) - Seq(carbonStrategy.CarbonTableScan, carbonStrategy.DDLStrategies) - } - - override protected def configure(): Map[String, String] = { - sc.hadoopConfiguration.addResource("hive-site.xml") - if (sc.hadoopConfiguration.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) { - val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - logDebug(s"metastore db is going to be created in location: $hiveMetaStoreDB") - super.configure() ++ Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL, - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"), - ("hive.metastore.warehouse.dir", metaStorePathAbsolute + "/hivemetadata")) - } else { - super.configure() - } - } - - @transient - private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) - - var queryId: String = "" - - override def sql(sql: String): DataFrame = { - // queryId will be unique for each query, creting query detail holder - queryId = System.nanoTime() + "" - this.setConf("queryId", queryId) - - CarbonContext.updateCarbonPorpertiesPath(this) - val sqlString = sql.toUpperCase - LOGGER.info(s"Query [$sqlString]") - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - val logicPlan: LogicalPlan = parseSql(sql) - statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - val result = new DataFrame(this, logicPlan) - - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - result - } - -} - -object CarbonContext { - - val datasourceName: String = "org.apache.carbondata.format" - - val datasourceShortName: String = "carbondata" - - @transient - private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) - - final def updateCarbonPorpertiesPath(hiveContext: HiveContext) { - val carbonPropertiesFilePath = hiveContext.getConf("carbon.properties.filepath", null) - val systemcarbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) - if (null != carbonPropertiesFilePath && null == systemcarbonPropertiesFilePath) { - System.setProperty("carbon.properties.filepath", - carbonPropertiesFilePath + "/" + "carbon.properties") - } - // configuring the zookeeper URl . - val zooKeeperUrl = hiveContext.getConf("spark.deploy.zookeeper.url", "127.0.0.1:2181") - - CarbonProperties.getInstance().addProperty("spark.deploy.zookeeper.url", zooKeeperUrl) - - } - - // this cache is used to avoid creating multiple CarbonContext from same SparkContext, - // to avoid the derby problem for metastore - private val cache = collection.mutable.Map[SparkContext, CarbonContext]() - - def getInstance(sc: SparkContext): CarbonContext = { - cache(sc) - } - - def addInstance(sc: SparkContext, cc: CarbonContext): Unit = { - if (cache.contains(sc)) { - sys.error("creating multiple instances of CarbonContext is not " + - "allowed using the same SparkContext instance") - } - cache(sc) = cc - } -} - -object SQLParser { - def parse(sql: String, sqlContext: SQLContext): LogicalPlan = sqlContext.parseSql(sql) -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala deleted file mode 100644 index 2c2e954..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.text.SimpleDateFormat -import java.util.Date - -import scala.reflect.ClassTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{Job, JobID} -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark._ -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration - -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.scan.expression.logical.AndExpression -import org.apache.carbondata.core.util.DataTypeUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection} -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat -import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader} -import org.apache.carbondata.processing.merger.TableMeta -import org.apache.carbondata.spark.{CarbonFilters, CarbonOption} -import org.apache.carbondata.spark.rdd.CarbonRDD -import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl -import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl - -private[sql] case class CarbonDatasourceHadoopRelation( - sqlContext: SQLContext, - paths: Array[String], - parameters: Map[String, String], - tableSchema: Option[StructType]) - extends HadoopFsRelation { - - lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head)) - if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) { - throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head) - } - - lazy val job = new Job(new JobConf()) - lazy val options = new CarbonOption(parameters) - lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) - lazy val relationRaw: CarbonRelation = { - val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier) - if (carbonTable == null) { - sys.error(s"CarbonData file path ${paths.head} is not valid") - } - CarbonRelation( - carbonTable.getDatabaseName, - carbonTable.getFactTableName, - CarbonSparkUtil.createSparkMeta(carbonTable), - new TableMeta(carbonTable.getCarbonTableIdentifier, - paths.head, absIdentifier.getTablePath, carbonTable), - None - )(sqlContext) - } - - override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema) - - override def prepareJobForWrite(job: Job): OutputWriterFactory = { - // TODO - throw new UnsupportedOperationException - } - - override def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputFiles: Array[FileStatus]): RDD[Row] = { - val conf = new Configuration(job.getConfiguration) - filters.flatMap { filter => - CarbonFilters.createCarbonFilter(dataSchema, filter) - }.reduceOption(new AndExpression(_, _)) - .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _)) - - val projection = new CarbonProjection - requiredColumns.foreach(projection.addColumn) - CarbonTableInputFormat.setColumnProjection(conf, projection) - CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl]) - - new CarbonHadoopFSRDD[Row](sqlContext.sparkContext, - new SerializableConfiguration(conf), - absIdentifier, - classOf[CarbonTableInputFormat[Row]], - classOf[Row] - ) - } - -} - -class CarbonHadoopFSPartition(rddId: Int, val idx: Int, - val carbonSplit: SerializableWritable[CarbonInputSplit]) - extends Partition { - - override val index: Int = idx - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - -class CarbonHadoopFSRDD[V: ClassTag]( - @transient sc: SparkContext, - conf: SerializableConfiguration, - identifier: AbsoluteTableIdentifier, - inputFormatClass: Class[_ <: CarbonTableInputFormat[V]], - valueClass: Class[V]) - extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil { - - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new Date()) - } - @transient protected val jobId = new JobID(jobTrackerId, id) - - override def internalCompute(split: Partition, - context: TaskContext): Iterator[V] = { - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId) - val job: Job = new Job(hadoopAttemptContext.getConfiguration) - val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job) - CarbonInputFormat.setDataTypeConverter(hadoopAttemptContext.getConfiguration, - new SparkDataTypeConverterImpl) - hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) - val reader = - format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, - hadoopAttemptContext - ) - reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, - hadoopAttemptContext - ) - new Iterator[V] { - private[this] var havePair = false - private[this] var finished = false - - override def hasNext: Boolean = { - if (context.isInterrupted) { - throw new TaskKilledException - } - if (!finished && !havePair) { - finished = !reader.nextKeyValue - if (finished) { - reader.close() - } - havePair = !finished - } - !finished - } - - override def next(): V = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - reader.getCurrentValue - } - } - } - - override protected def getPartitions: Array[Partition] = { - val jobContext = newJobContext(conf.value, jobId) - val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value)) - jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) - val splits = format.getSplits(jobContext).toArray - val carbonInputSplits = splits - .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit])) - carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1)) - } -}