Are you specifying your spark master in the scala program? Best Regards, Sonal Founder, Nube Technologies <http://www.nubetech.co> Reifier at Strata Hadoop World <https://www.youtube.com/watch?v=eD3LkpPQIgM> Reifier at Spark Summit 2015 <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
<http://in.linkedin.com/in/sonalgoyal> On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju < m.vijayaragh...@gmail.com> wrote: > Mike, > > We tried that. This map task is actually part of a larger set of > operations. I pointed out this map task since it involves partitionBy() and > we always use partitionBy() whenever partition-unaware shuffle operations > are performed (such as distinct). We in fact do not notice a change in the > distribution after several unrelated stages are executed and a significant > time has passed (nearly 10-15 minutes). > > I agree. We are not looking for partitions to go to specific nodes and nor > do we expect a uniform distribution of keys across the cluster. There will > be a skew. But it cannot be that all the data is on one node and nothing on > the other and no, the keys are not the same. They vary from 1 to around > 55000 (integers). What makes this strange is that it seems to work fine on > the spark shell (REPL). > > Regards, > Raghava. > > > On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote: > >> A HashPartitioner will indeed partition based on the key, but you >> cannot know on *which* node that key will appear. Again, the RDD >> partitions will not necessarily be distributed evenly across your >> nodes because of the greedy scheduling of the first wave of tasks, >> particularly if those tasks have durations less than the initial >> executor delay. I recommend you look at your logs to verify if this is >> happening to you. >> >> Mike >> >> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote: >> > Good point Mike +1 >> > >> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote: >> > >> >> When submitting a job with spark-submit, I've observed delays (up to >> >> 1--2 seconds) for the executors to respond to the driver in order to >> >> receive tasks in the first stage. The delay does not persist once the >> >> executors have been synchronized. >> >> >> >> When the tasks are very short, as may be your case (relatively small >> >> data and a simple map task like you have described), the 8 tasks in >> >> your stage may be allocated to only 1 executor in 2 waves of 4, since >> >> the second executor won't have responded to the master before the >> >> first 4 tasks on the first executor have completed. >> >> >> >> To see if this is the cause in your particular case, you could try the >> >> following to confirm: >> >> 1. Examine the starting times of the tasks alongside their >> >> executor >> >> 2. Make a "dummy" stage execute before your real stages to >> >> synchronize the executors by creating and materializing any random RDD >> >> 3. Make the tasks longer, i.e. with some silly computational >> >> work. >> >> >> >> Mike >> >> >> >> >> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >> >> > Yes its the same data. >> >> > >> >> > 1) The number of partitions are the same (8, which is an argument to >> >> > the >> >> > HashPartitioner). In the first case, these partitions are spread >> across >> >> > both the worker nodes. In the second case, all the partitions are on >> >> > the >> >> > same node. >> >> > 2) What resources would be of interest here? Scala shell takes the >> >> default >> >> > parameters since we use "bin/spark-shell --master <master-URL>" to >> run >> >> the >> >> > scala-shell. For the scala program, we do set some configuration >> >> > options >> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >> >> > serializer. >> >> > >> >> > We are running this on Azure D3-v2 machines which have 4 cores and >> 14GB >> >> > RAM.1 executor runs on each worker node. Following configuration >> >> > options >> >> > are set for the scala program -- perhaps we should move it to the >> spark >> >> > config file. >> >> > >> >> > Driver memory and executor memory are set to 12GB >> >> > parallelism is set to 8 >> >> > Kryo serializer is used >> >> > Number of retainedJobs and retainedStages has been increased to check >> >> them >> >> > in the UI. >> >> > >> >> > What information regarding Spark Context would be of interest here? >> >> > >> >> > Regards, >> >> > Raghava. >> >> > >> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >> >> > wrote: >> >> > >> >> >> If the data file is same then it should have similar distribution of >> >> >> keys. >> >> >> Few queries- >> >> >> >> >> >> 1. Did you compare the number of partitions in both the cases? >> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >> >> >> Program being submitted? >> >> >> >> >> >> Also, can you please share the details of Spark Context, Environment >> >> >> and >> >> >> Executors when you run via Scala program? >> >> >> >> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >> >> >> m.vijayaragh...@gmail.com> wrote: >> >> >> >> >> >>> Hello All, >> >> >>> >> >> >>> We are using HashPartitioner in the following way on a 3 node >> cluster >> >> (1 >> >> >>> master and 2 worker nodes). >> >> >>> >> >> >>> val u = >> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >> >> >>> (y.toInt, >> >> >>> x.toInt) } }).partitionBy(new >> >> HashPartitioner(8)).setName("u").persist() >> >> >>> >> >> >>> u.count() >> >> >>> >> >> >>> If we run this from the spark shell, the data (52 MB) is split >> across >> >> >>> the >> >> >>> two worker nodes. But if we put this in a scala program and run it, >> >> then >> >> >>> all the data goes to only one node. We have run it multiple times, >> >> >>> but >> >> >>> this >> >> >>> behavior does not change. This seems strange. >> >> >>> >> >> >>> Is there some problem with the way we use HashPartitioner? >> >> >>> >> >> >>> Thanks in advance. >> >> >>> >> >> >>> Regards, >> >> >>> Raghava. >> >> >>> >> >> >> >> >> >> >> >> > >> >> > >> >> > -- >> >> > Regards, >> >> > Raghava >> >> > http://raghavam.github.io >> >> > >> >> >> >> >> >> -- >> >> Thanks, >> >> Mike >> >> >> > >> >> >> -- >> Thanks, >> Mike >> > > > > -- > Regards, > Raghava > http://raghavam.github.io >