[ https://issues.apache.org/jira/browse/SPARK-3100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ravindra Pesala updated SPARK-3100: ----------------------------------- Description: I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 workers. When I run this RDD in Spark standalone cluster with 4 workers(even master machine has one worker), it runs all partitions in one node only even though I have given locality preferences in my SampleRDD program. *Sample Code* {code} class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String]) extends Partition { override def hashCode(): Int = 41 * (41 + rddId) + idx override val index: Int = idx } class SampleRDD[K,V]( sc : SparkContext,keyClass: KeyVal[K,V]) extends RDD[(K,V)](sc, Nil) with Logging { override def getPartitions: Array[Partition] = { val hosts = Array("master","slave1","slave2","slave3") val result = new Array[Partition](4) for (i <- 0 until result.length) { result(i) = new SamplePartition(id, i, Array(hosts(i))) } result } override def compute(theSplit: Partition, context: TaskContext) = { val iter = new Iterator[(K,V)] { val split = theSplit.asInstanceOf[SamplePartition] logInfo("Executed task for the split" + split.tableSplit) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false var finished = false override def hasNext: Boolean = { if (!finished && !havePair) { finished = !false havePair = !finished } !finished } override def next(): (K,V) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false val key = new Key() val value = new Value() keyClass.getKey(key, value) } private def close() { try { // reader.close() } catch { case e: Exception => logWarning("Exception in RecordReader.close()", e) } } } iter } override def getPreferredLocations(split: Partition): Seq[String] = { val theSplit = split.asInstanceOf[SamplePartition] val s = theSplit.tableSplit.filter(_ != "localhost") logInfo("Host Name : "+s(0)) s } } trait KeyVal[K,V] extends Serializable { def getKey(key : Key,value : Value) : (K,V) } class KeyValImpl extends KeyVal[Key,Value] { override def getKey(key : Key,value : Value) = (key,value) } case class Key() case class Value() object SampleRDD { def main(args: Array[String]) : Unit= { val d = SparkContext.jarOfClass(this.getClass) val ar = new Array[String](d.size) var i = 0 d.foreach{ p=> ar(i)=p; i = i+1 } val sc = new SparkContext("spark://master:7077", "SampleSpark", "/opt/spark-1.0.0-rc3/",ar) val rdd = new SampleRDD(sc,new KeyValImpl()); rdd.collect; } } {code} Following is the log it shows. INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now RUNNING INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now RUNNING INFO 18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now RUNNING INFO 18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now RUNNING INFO 18-08 16:38:34,976 - Registered executor: Actor akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0 INFO 18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms INFO 18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms INFO 18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms INFO 18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms INFO 18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 MB RAM INFO 18-08 16:38:35,296 - Registered executor: Actor akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2 INFO 18-08 16:38:35,302 - Registered executor: Actor akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1 INFO 18-08 16:38:35,317 - Registered executor: Actor akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3 was: I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 workers. When I run this RDD in Spark standalone cluster with 4 workers(even master machine has one worker), it runs all partitions in one node only even though I have given locality preferences in my SampleRDD program. *Sample Code* class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String]) extends Partition { override def hashCode(): Int = 41 * (41 + rddId) + idx override val index: Int = idx } class SampleRDD[K,V]( sc : SparkContext,keyClass: KeyVal[K,V]) extends RDD[(K,V)](sc, Nil) with Logging { override def getPartitions: Array[Partition] = { val hosts = Array("master","slave1","slave2","slave3") val result = new Array[Partition](4) for (i <- 0 until result.length) { result(i) = new SamplePartition(id, i, Array(hosts(i))) } result } override def compute(theSplit: Partition, context: TaskContext) = { val iter = new Iterator[(K,V)] { val split = theSplit.asInstanceOf[SamplePartition] logInfo("Executed task for the split" + split.tableSplit) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false var finished = false override def hasNext: Boolean = { if (!finished && !havePair) { finished = !false havePair = !finished } !finished } override def next(): (K,V) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false val key = new Key() val value = new Value() keyClass.getKey(key, value) } private def close() { try { // reader.close() } catch { case e: Exception => logWarning("Exception in RecordReader.close()", e) } } } iter } override def getPreferredLocations(split: Partition): Seq[String] = { val theSplit = split.asInstanceOf[SamplePartition] val s = theSplit.tableSplit.filter(_ != "localhost") logInfo("Host Name : "+s(0)) s } } trait KeyVal[K,V] extends Serializable { def getKey(key : Key,value : Value) : (K,V) } class KeyValImpl extends KeyVal[Key,Value] { override def getKey(key : Key,value : Value) = (key,value) } case class Key() case class Value() object SampleRDD { def main(args: Array[String]) : Unit= { val d = SparkContext.jarOfClass(this.getClass) val ar = new Array[String](d.size) var i = 0 d.foreach{ p=> ar(i)=p; i = i+1 } val sc = new SparkContext("spark://master:7077", "SampleSpark", "/opt/spark-1.0.0-rc3/",ar) val rdd = new SampleRDD(sc,new KeyValImpl()); rdd.collect; } } Following is the log it shows. INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now RUNNING INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now RUNNING INFO 18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now RUNNING INFO 18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now RUNNING INFO 18-08 16:38:34,976 - Registered executor: Actor akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0 INFO 18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms INFO 18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms INFO 18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms INFO 18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: *master (PROCESS_LOCAL)* INFO 18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms INFO 18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 MB RAM INFO 18-08 16:38:35,296 - Registered executor: Actor akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2 INFO 18-08 16:38:35,302 - Registered executor: Actor akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1 INFO 18-08 16:38:35,317 - Registered executor: Actor akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3 > Spark RDD partitions are not running in the workers as per locality > information given by each partition. > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-3100 > URL: https://issues.apache.org/jira/browse/SPARK-3100 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.0.0 > Environment: Running in Spark Standalone Cluster > Reporter: Ravindra Pesala > > I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 > workers. > When I run this RDD in Spark standalone cluster with 4 workers(even master > machine has one worker), it runs all partitions in one node only even though > I have given locality preferences in my SampleRDD program. > *Sample Code* > {code} > class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String]) > extends Partition { > override def hashCode(): Int = 41 * (41 + rddId) + idx > override val index: Int = idx > } > class SampleRDD[K,V]( > sc : SparkContext,keyClass: KeyVal[K,V]) > extends RDD[(K,V)](sc, Nil) > with Logging { > override def getPartitions: Array[Partition] = { > val hosts = Array("master","slave1","slave2","slave3") > val result = new Array[Partition](4) > for (i <- 0 until result.length) > { > result(i) = new SamplePartition(id, i, Array(hosts(i))) > } > result > } > > > override def compute(theSplit: Partition, context: TaskContext) = { > val iter = new Iterator[(K,V)] { > val split = theSplit.asInstanceOf[SamplePartition] > logInfo("Executed task for the split" + split.tableSplit) > > // Register an on-task-completion callback to close the input stream. > context.addOnCompleteCallback(() => close()) > var havePair = false > var finished = false > override def hasNext: Boolean = { > if (!finished && !havePair) > { > finished = !false > havePair = !finished > } > !finished > } > override def next(): (K,V) = { > if (!hasNext) { > throw new java.util.NoSuchElementException("End of stream") > } > havePair = false > val key = new Key() > val value = new Value() > keyClass.getKey(key, value) > } > private def close() { > try { > // reader.close() > } catch { > case e: Exception => logWarning("Exception in > RecordReader.close()", e) > } > } > } > iter > } > > override def getPreferredLocations(split: Partition): Seq[String] = { > val theSplit = split.asInstanceOf[SamplePartition] > val s = theSplit.tableSplit.filter(_ != "localhost") > logInfo("Host Name : "+s(0)) > s > } > } > trait KeyVal[K,V] extends Serializable { > def getKey(key : Key,value : Value) : (K,V) > } > class KeyValImpl extends KeyVal[Key,Value] { > override def getKey(key : Key,value : Value) = (key,value) > } > case class Key() > case class Value() > object SampleRDD { > def main(args: Array[String]) : Unit= { > val d = SparkContext.jarOfClass(this.getClass) > val ar = new Array[String](d.size) > var i = 0 > d.foreach{ > p=> ar(i)=p; > i = i+1 > } > val sc = new SparkContext("spark://master:7077", "SampleSpark", > "/opt/spark-1.0.0-rc3/",ar) > val rdd = new SampleRDD(sc,new KeyValImpl()); > rdd.collect; > } > } > {code} > Following is the log it shows. > INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now > RUNNING > INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now > RUNNING > INFO 18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now > RUNNING > INFO 18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now > RUNNING > INFO 18-08 16:38:34,976 - Registered executor: Actor > akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0 > INFO 18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: > *master (PROCESS_LOCAL)* > INFO 18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms > INFO 18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: > *master (PROCESS_LOCAL)* > INFO 18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms > INFO 18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: > *master (PROCESS_LOCAL)* > INFO 18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms > INFO 18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: > *master (PROCESS_LOCAL)* > INFO 18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms > INFO 18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 > MB RAM > INFO 18-08 16:38:35,296 - Registered executor: Actor > akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2 > INFO 18-08 16:38:35,302 - Registered executor: Actor > akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1 > INFO 18-08 16:38:35,317 - Registered executor: Actor > akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org