No. We specify it as a configuration option to the spark-submit. Does that make a difference?
Regards, Raghava. On Mon, Apr 18, 2016 at 9:56 AM, Sonal Goyal <sonalgoy...@gmail.com> wrote: > Are you specifying your spark master in the scala program? > > Best Regards, > Sonal > Founder, Nube Technologies <http://www.nubetech.co> > Reifier at Strata Hadoop World > <https://www.youtube.com/watch?v=eD3LkpPQIgM> > Reifier at Spark Summit 2015 > <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju < > m.vijayaragh...@gmail.com> wrote: > >> Mike, >> >> We tried that. This map task is actually part of a larger set of >> operations. I pointed out this map task since it involves partitionBy() and >> we always use partitionBy() whenever partition-unaware shuffle operations >> are performed (such as distinct). We in fact do not notice a change in the >> distribution after several unrelated stages are executed and a significant >> time has passed (nearly 10-15 minutes). >> >> I agree. We are not looking for partitions to go to specific nodes and >> nor do we expect a uniform distribution of keys across the cluster. There >> will be a skew. But it cannot be that all the data is on one node and >> nothing on the other and no, the keys are not the same. They vary from 1 to >> around 55000 (integers). What makes this strange is that it seems to work >> fine on the spark shell (REPL). >> >> Regards, >> Raghava. >> >> >> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote: >> >>> A HashPartitioner will indeed partition based on the key, but you >>> cannot know on *which* node that key will appear. Again, the RDD >>> partitions will not necessarily be distributed evenly across your >>> nodes because of the greedy scheduling of the first wave of tasks, >>> particularly if those tasks have durations less than the initial >>> executor delay. I recommend you look at your logs to verify if this is >>> happening to you. >>> >>> Mike >>> >>> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote: >>> > Good point Mike +1 >>> > >>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote: >>> > >>> >> When submitting a job with spark-submit, I've observed delays (up to >>> >> 1--2 seconds) for the executors to respond to the driver in order to >>> >> receive tasks in the first stage. The delay does not persist once the >>> >> executors have been synchronized. >>> >> >>> >> When the tasks are very short, as may be your case (relatively small >>> >> data and a simple map task like you have described), the 8 tasks in >>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since >>> >> the second executor won't have responded to the master before the >>> >> first 4 tasks on the first executor have completed. >>> >> >>> >> To see if this is the cause in your particular case, you could try the >>> >> following to confirm: >>> >> 1. Examine the starting times of the tasks alongside their >>> >> executor >>> >> 2. Make a "dummy" stage execute before your real stages to >>> >> synchronize the executors by creating and materializing any random RDD >>> >> 3. Make the tasks longer, i.e. with some silly computational >>> >> work. >>> >> >>> >> Mike >>> >> >>> >> >>> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >>> >> > Yes its the same data. >>> >> > >>> >> > 1) The number of partitions are the same (8, which is an argument to >>> >> > the >>> >> > HashPartitioner). In the first case, these partitions are spread >>> across >>> >> > both the worker nodes. In the second case, all the partitions are on >>> >> > the >>> >> > same node. >>> >> > 2) What resources would be of interest here? Scala shell takes the >>> >> default >>> >> > parameters since we use "bin/spark-shell --master <master-URL>" to >>> run >>> >> the >>> >> > scala-shell. For the scala program, we do set some configuration >>> >> > options >>> >> > such as driver memory (12GB), parallelism is set to 8 and we use >>> Kryo >>> >> > serializer. >>> >> > >>> >> > We are running this on Azure D3-v2 machines which have 4 cores and >>> 14GB >>> >> > RAM.1 executor runs on each worker node. Following configuration >>> >> > options >>> >> > are set for the scala program -- perhaps we should move it to the >>> spark >>> >> > config file. >>> >> > >>> >> > Driver memory and executor memory are set to 12GB >>> >> > parallelism is set to 8 >>> >> > Kryo serializer is used >>> >> > Number of retainedJobs and retainedStages has been increased to >>> check >>> >> them >>> >> > in the UI. >>> >> > >>> >> > What information regarding Spark Context would be of interest here? >>> >> > >>> >> > Regards, >>> >> > Raghava. >>> >> > >>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >>> >> > wrote: >>> >> > >>> >> >> If the data file is same then it should have similar distribution >>> of >>> >> >> keys. >>> >> >> Few queries- >>> >> >> >>> >> >> 1. Did you compare the number of partitions in both the cases? >>> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >>> >> >> Program being submitted? >>> >> >> >>> >> >> Also, can you please share the details of Spark Context, >>> Environment >>> >> >> and >>> >> >> Executors when you run via Scala program? >>> >> >> >>> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >>> >> >> m.vijayaragh...@gmail.com> wrote: >>> >> >> >>> >> >>> Hello All, >>> >> >>> >>> >> >>> We are using HashPartitioner in the following way on a 3 node >>> cluster >>> >> (1 >>> >> >>> master and 2 worker nodes). >>> >> >>> >>> >> >>> val u = >>> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >>> >> >>> (y.toInt, >>> >> >>> x.toInt) } }).partitionBy(new >>> >> HashPartitioner(8)).setName("u").persist() >>> >> >>> >>> >> >>> u.count() >>> >> >>> >>> >> >>> If we run this from the spark shell, the data (52 MB) is split >>> across >>> >> >>> the >>> >> >>> two worker nodes. But if we put this in a scala program and run >>> it, >>> >> then >>> >> >>> all the data goes to only one node. We have run it multiple times, >>> >> >>> but >>> >> >>> this >>> >> >>> behavior does not change. This seems strange. >>> >> >>> >>> >> >>> Is there some problem with the way we use HashPartitioner? >>> >> >>> >>> >> >>> Thanks in advance. >>> >> >>> >>> >> >>> Regards, >>> >> >>> Raghava. >>> >> >>> >>> >> >> >>> >> >> >>> >> > >>> >> > >>> >> > -- >>> >> > Regards, >>> >> > Raghava >>> >> > http://raghavam.github.io >>> >> > >>> >> >>> >> >>> >> -- >>> >> Thanks, >>> >> Mike >>> >> >>> > >>> >>> >>> -- >>> Thanks, >>> Mike >>> >> >> >> >> -- >> Regards, >> Raghava >> http://raghavam.github.io >> > > -- Regards, Raghava http://raghavam.github.io