[CARBONDATA-1796] While submiting new job, pass credentials in jobConf object
This closes #1855 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b421c246 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b421c246 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b421c246 Branch: refs/heads/fgdatamap Commit: b421c24689b18429f13fd150ab4dd422c61ca622 Parents: 2081fba Author: akashrn5 <akashnilu...@gmail.com> Authored: Wed Jan 24 14:26:28 2018 +0530 Committer: QiangCai <qiang...@qq.com> Committed: Tue Jan 30 18:52:25 2018 +0800 ---------------------------------------------------------------------- .../spark/rdd/CarbonIUDMergerRDD.scala | 3 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 ++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 7 +++- .../scala/org/apache/spark/util/SparkUtil.scala | 35 ++------------------ .../spark/rdd/CarbonDataRDDFactory.scala | 5 +-- 5 files changed, 15 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index e8180cd..4378c15 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.execution.command.CarbonMergerMapping -import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat @@ -57,6 +57,7 @@ class CarbonIUDMergerRDD[K, V]( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) val jobConf: JobConf = new JobConf(new Configuration) + SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) val defaultParallelism = sparkContext.defaultParallelism http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 8d7b044..c482a92 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.util.CarbonException @@ -276,6 +277,7 @@ class CarbonMergerRDD[K, V]( val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( absoluteTableIdentifier) val jobConf: JobConf = new JobConf(new Configuration) + SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) CarbonTableInputFormat.setPartitionsToPrune( http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index f2c3060..49c0225 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -26,9 +26,11 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.SparkSession @@ -81,7 +83,10 @@ class CarbonScanRDD( @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def getPartitions: Array[Partition] = { - val job = Job.getInstance(new Configuration()) + val conf = new Configuration() + val jobConf = new JobConf(conf) + SparkHadoopUtil.get.addCredentials(jobConf) + val job = Job.getInstance(jobConf) val format = prepareInputFormatForDriver(job.getConfiguration) // initialise query_id for job http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala index 9c37640..4635fc7 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala @@ -19,8 +19,10 @@ package org.apache.spark.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD} import org.apache.carbondata.processing.loading.csvinput.BlockDetails @@ -37,37 +39,4 @@ object SparkUtil { } } - /** - * get file splits,return Array[BlockDetails], if file path is empty,then return empty Array - * - */ - def getSplits(path: String, sc: SparkContext): Array[BlockDetails] = { - val filePath = FileUtils.getPaths(path) - if (filePath == null || filePath.isEmpty) { - // return a empty block details - Array[BlockDetails]() - } else { - // clone the hadoop configuration - val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) - // set folder or file - hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePath) - hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true") - val newHadoopRDD = new NewHadoopRDD[LongWritable, Text]( - sc, - classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], - classOf[LongWritable], - classOf[Text], - hadoopConfiguration) - val splits: Array[FileSplit] = newHadoopRDD.getPartitions.map { part => - part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit] - } - splits.map { block => - new BlockDetails(block.getPath, - block.getStart, - block.getLength, - block.getLocations - ) - } - } - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 809c8ff..8212e85 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -1030,9 +1030,10 @@ object CarbonDataRDDFactory { org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConf) - + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat - val jobContext = new Job(hadoopConf) + val jobContext = new Job(jobConf) val rawSplits = inputFormat.getSplits(jobContext).toArray val blockList = rawSplits.map { inputSplit => val fileSplit = inputSplit.asInstanceOf[FileSplit]