Hi Gaurav, Seems like you could use a broadcast variable for this if I understand your use case. Create it in the driver based on the CommandLineArguments and then use it in the workers.
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables So something like: Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12])); Then just reference the broadcast variable in you workers. It will get shipped once to all nodes in the cluster and can be referenced by them. HTH. -Todd On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <sharmagaura...@gmail.com> wrote: > Hi, > > I am using Kafka Spark cluster for real time aggregation analytics use > case in production. > > Cluster details > 6 nodes, each node running 1 Spark and kafka processes each. > Node1 -> 1 Master , 1 Worker, 1 Driver, > 1 Kafka process > Node 2,3,4,5,6 -> 1 Worker prcocess each > 1 Kafka process each > > Spark version 1.3.0 > Kafka Veriosn 0.8.1 > > I am using Kafka Directstream for Kafka Spark Integration. > Analytics code is written in using Spark Java API. > > Problem Statement : > > I want to accept a paramter as command line argument, and pass on to > the executors. > (want to use the paramter in rdd.foreach method which is executed on > executor) > > I understand that when driver is started, only the jar is > transported to all the Workers. > But i need to use the dynamically passed command line argument in > the reduce operation executed on executors. > > > Code Snippets for better understanding my problem : > > public class KafkaReconcilationJob { > > private static Logger logger = > Logger.getLogger(KafkaReconcilationJob.class); > public static void main(String[] args) throws Exception { > CommandLineArguments.CLICK_THRESHOLD = > Integer.parseInt(args[12]); > -------------------------------------------------------> I want to use this > command line argument > } > > } > > > > JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData = > adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() { > @Override > public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception { > if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD || > adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){ > return true; > }else { > return false; > } > } > }); > > > > The above mentioned Filter operation gets executed on executor which has 0 > as the value of the static field CommandLineArguments.CLICK_THRESHOLD > > > Regards, > Gaurav >