Hi All, When i am submitting a spark job on YARN with Custom Partitioner, it is not picked by Executors. Executors still using the default HashPartitioner. I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala) and Custom Partitioner. The completed executor logs shows HashPartitioner.
Below is the Spark application code with Custom Partitioner and the log line which is added into HashPartitioner class of Partition.scala log.info("HashPartitioner="+key+"---"+numPartitions+"----"+Utils.nonNegativeMod(key.hashCode, numPartitions)) The Executor logs has 16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---4----2 16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---4----2 ........ How to make sure, the executors are picking the right partitioner. *Code:* package org.apache.spark class ExactPartitioner(partitions: Int) extends Partitioner with Logging{ def numPartitions: Int = partitions def getPartition(key: Any): Int = { * log.info <http://log.info>("ExactPartitioner="+key)* key match{ case "INFO" => 0 case "DEBUG" => 1 case "ERROR" => 2 case "WARN" => 3 case "FATAL" => 4 } } } object GroupByCLDB { def main(args: Array[String]) { val logFile = "/DATA" val sparkConf = new SparkConf().setAppName("GroupBy") sparkConf.set("spark.executor.memory","4g"); sparkConf.set("spark.executor.cores","2"); sparkConf.set("spark.executor.instances","2"); val sc = new SparkContext(sparkConf) val logData = sc.textFile(logFile) case class LogClass(one:String,two:String) def parse(line: String) = { val pieces = line.split(' ') val level = pieces(2).toString val one = pieces(0).toString val two = pieces(1).toString (level,LogClass(one,two)) } val output = logData.map(x => parse(x)) *val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val groups = partitioned.groupByKey(new ExactPartitioner(5))* groups.count() output.partitions.size partitioned.partitions.size } } Thanks, Prabhu Joseph