Mike, It turns out the executor delay, as you mentioned, is the cause. After we introduced a dummy stage, partitioning was working fine. Does this delay happen during later stages as well? We noticed the same behavior (partitioning happens on spark-shell but not through spark-submit) at a later stage also.
Apart from introducing a dummy stage or running it from spark-shell, is there any other option to fix this? Regards, Raghava. On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: > When submitting a job with spark-submit, I've observed delays (up to > 1--2 seconds) for the executors to respond to the driver in order to > receive tasks in the first stage. The delay does not persist once the > executors have been synchronized. > > When the tasks are very short, as may be your case (relatively small > data and a simple map task like you have described), the 8 tasks in > your stage may be allocated to only 1 executor in 2 waves of 4, since > the second executor won't have responded to the master before the > first 4 tasks on the first executor have completed. > > To see if this is the cause in your particular case, you could try the > following to confirm: > 1. Examine the starting times of the tasks alongside their executor > 2. Make a "dummy" stage execute before your real stages to > synchronize the executors by creating and materializing any random RDD > 3. Make the tasks longer, i.e. with some silly computational work. > > Mike > > > On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: > > Yes its the same data. > > > > 1) The number of partitions are the same (8, which is an argument to the > > HashPartitioner). In the first case, these partitions are spread across > > both the worker nodes. In the second case, all the partitions are on the > > same node. > > 2) What resources would be of interest here? Scala shell takes the > default > > parameters since we use "bin/spark-shell --master <master-URL>" to run > the > > scala-shell. For the scala program, we do set some configuration options > > such as driver memory (12GB), parallelism is set to 8 and we use Kryo > > serializer. > > > > We are running this on Azure D3-v2 machines which have 4 cores and 14GB > > RAM.1 executor runs on each worker node. Following configuration options > > are set for the scala program -- perhaps we should move it to the spark > > config file. > > > > Driver memory and executor memory are set to 12GB > > parallelism is set to 8 > > Kryo serializer is used > > Number of retainedJobs and retainedStages has been increased to check > them > > in the UI. > > > > What information regarding Spark Context would be of interest here? > > > > Regards, > > Raghava. > > > > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote: > > > >> If the data file is same then it should have similar distribution of > >> keys. > >> Few queries- > >> > >> 1. Did you compare the number of partitions in both the cases? > >> 2. Did you compare the resource allocation for Spark Shell vs Scala > >> Program being submitted? > >> > >> Also, can you please share the details of Spark Context, Environment and > >> Executors when you run via Scala program? > >> > >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < > >> m.vijayaragh...@gmail.com> wrote: > >> > >>> Hello All, > >>> > >>> We are using HashPartitioner in the following way on a 3 node cluster > (1 > >>> master and 2 worker nodes). > >>> > >>> val u = > >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, > >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt, > >>> x.toInt) } }).partitionBy(new > HashPartitioner(8)).setName("u").persist() > >>> > >>> u.count() > >>> > >>> If we run this from the spark shell, the data (52 MB) is split across > >>> the > >>> two worker nodes. But if we put this in a scala program and run it, > then > >>> all the data goes to only one node. We have run it multiple times, but > >>> this > >>> behavior does not change. This seems strange. > >>> > >>> Is there some problem with the way we use HashPartitioner? > >>> > >>> Thanks in advance. > >>> > >>> Regards, > >>> Raghava. > >>> > >> > >> > > > > > > -- > > Regards, > > Raghava > > http://raghavam.github.io > > > > > -- > Thanks, > Mike > -- Regards, Raghava http://raghavam.github.io