ok, some more information (and presumably a workaround).

when I initial read in my file, I use the following code.

JavaRDD<String> keyFileRDD = sc.textFile(keyFile)

Looking at the UI, this file has 2 partitions (both on the same executor).

I then subsequently repartition this RDD (to 16)

partKeyFileRDD = keyFileRDD.repartition(16)

Looking again at the UI, this file has 16 partitions now (all on the same 
executor). When the forEachPartition runs, this then uses these 16 partitions 
(all on the same executor).  I think this is really the problem.  I'm not sure 
why the repartition didn't spread the partitions across both executors.

When the mapToPair subsequently runs below both executors are used and things 
start falling apart because none of the initialization logic was performed on 
the one executor.

However, if I modify the code above 

JavaRDD<String> keyFileRDD = sc.textFile(keyFile,16)

Then initial keyFileRDD will be in 16 partitions spread across both executors.  
When I execute my forEachPartition directly on keyFileRDD (since there is no 
need to repartition), both executors will now be used (and initialized).

Anyway, don't know if this is my lack of understanding for how repartition 
should work or if this is a bug.  Thanks Jacek for starting to dig into this.

Darin.



----- Original Message -----
From: Darin McBeath <ddmcbe...@yahoo.com.INVALID>
To: Jacek Laskowski <ja...@japila.pl>
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:57 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor

My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value 
is the rec.
JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new 
SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following 
log.info statements only appear in the log file for one of my executors (and 
not both).  In other words, when executing the forEachPartition above, Spark 
appears to think all of the partitions are on one executor (at least that is 
the impression I'm left with).  But, when I get to the mapToToPair, Spark 
suddenly begins to use both executors.  I have verified that there are 16 
partitions for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction<Iterator<String>> 
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator<String> arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}

________________________________
From: Jacek Laskowski <ja...@japila.pl>
To: Darin McBeath <ddmcbe...@yahoo.com> 
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath" <ddmcbe...@yahoo.com> napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the 
>foreachPartition, I can see the statements being logged. Oddly, these output 
>statements only occur in one executor (and not the other).  It occurs 16 times 
>in this executor  since there are 16 partitions.  This seems odd as there are 
>only 8 cores on the executor and the other executor doesn't appear to be 
>called at all in the foreachPartition call.  But, when I go to do a map 
>function on this same RDD then things start blowing up on the other executor 
>as it starts doing work for some partitions (although, it would appear that 
>all partitions were only initialized on the other executor). The executor that 
>was used in the foreachPartition call works fine and doesn't experience issue. 
> But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>________________________________
>From: Jacek Laskowski <ja...@japila.pl>
>To: Darin McBeath <ddmcbe...@yahoo.com>
>Cc: user <user@spark.apache.org>
>Sent: Friday, March 11, 2016 1:24 PM
>Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
>executor
>
>
>
>Hi,
>How do you check which executor is used? Can you include a screenshot of the 
>master's webUI with workers?
>Jacek
>11.03.2016 6:57 PM "Darin McBeath" <ddmcbe...@yahoo.com.invalid> napisał(a):
>
>I've run into a situation where it would appear that foreachPartition is only 
>running on one of my executors.
>>
>>I have a small cluster (2 executors with 8 cores each).
>>
>>When I run a job with a small file (with 16 partitions) I can see that the 16 
>>partitions are initialized but they all appear to be initialized on only one 
>>executor.  All of the work then runs on this  one executor (even though the 
>>number of partitions is 16). This seems odd, but at least it works.  Not sure 
>>why the other executor was not used.
>>
>>However, when I run a larger file (once again with 16 partitions) I can see 
>>that the 16 partitions are initialized once again (but all on the same 
>>executor).  But, this time subsequent work is now spread across the 2 
>>executors.  This of course results in problems because the other executor was 
>>not initialized as all of the partitions were only initialized on the other 
>>executor.
>>
>>Does anyone have any suggestions for where I might want to investigate?  Has 
>>anyone else seen something like this before?  Any thoughts/insights would be 
>>appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
>>the spark ec2 scripts  and submitting my job using spark-submit.
>>
>>Thanks.
>>
>>Darin.
>>
>>---------------------------------------------------------------------
>>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>For additional commands, e-mail: user-h...@spark.apache.org

>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to