Hi Gaurav,

Seems like you could use a broadcast variable for this if I understand your
use case.  Create it in the driver based on the CommandLineArguments and
then use it in the workers.


So something like:

Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));

Then just reference the broadcast variable in you workers.  It will get
shipped once to all nodes in the cluster and can be referenced by them.



On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <sharmagaura...@gmail.com>

> Hi,
> I am using Kafka Spark cluster for real time aggregation analytics use
> case in production.
> Cluster details
> 6 nodes, each node running 1 Spark and kafka processes each.
> Node1              -> 1 Master , 1 Worker, 1 Driver,
>    1 Kafka process
> Node 2,3,4,5,6 -> 1 Worker prcocess each
>  1 Kafka process each
> Spark version 1.3.0
> Kafka Veriosn 0.8.1
> I am using Kafka Directstream for Kafka Spark Integration.
> Analytics code is written in using Spark Java API.
> Problem Statement :
>       I want to accept a paramter as command line argument, and pass on to
> the executors.
>       (want to use the paramter in rdd.foreach method which is executed on
> executor)
>       I understand that when driver is started, only the jar is
> transported to all the Workers.
>       But i need to use the dynamically passed command line argument in
> the reduce operation executed on executors.
> Code Snippets for better understanding my problem :
> public class KafkaReconcilationJob {
> private static Logger logger =
> Logger.getLogger(KafkaReconcilationJob.class);
>  public static void main(String[] args) throws Exception {
>                           CommandLineArguments.CLICK_THRESHOLD =
> Integer.parseInt(args[12]);
> -------------------------------------------------------> I want to use this
> command line argument
>         }
> }
> JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
> adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
> @Override
> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
> return true;
> }else {
> return false;
> }
> }
> });
> The above mentioned Filter operation gets executed on executor which has 0
> as the value of the static field CommandLineArguments.CLICK_THRESHOLD
> Regards,
> Gaurav

