Thanks Todd, that solved my problem.

(please excuse spelling mistakes)
Sent from phone
On Jun 11, 2015 6:42 PM, "Todd Nist" <> wrote:

> Hi Gaurav,
> Seems like you could use a broadcast variable for this if I understand
> your use case.  Create it in the driver based on the CommandLineArguments
> and then use it in the workers.
> So something like:
> Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));
> Then just reference the broadcast variable in you workers.  It will get
> shipped once to all nodes in the cluster and can be referenced by them.
> HTH.
> -Todd
> On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <>
> wrote:
>> Hi,
>> I am using Kafka Spark cluster for real time aggregation analytics use
>> case in production.
>> Cluster details
>> 6 nodes, each node running 1 Spark and kafka processes each.
>> Node1              -> 1 Master , 1 Worker, 1 Driver,
>>      1 Kafka process
>> Node 2,3,4,5,6 -> 1 Worker prcocess each
>>  1 Kafka process each
>> Spark version 1.3.0
>> Kafka Veriosn 0.8.1
>> I am using Kafka Directstream for Kafka Spark Integration.
>> Analytics code is written in using Spark Java API.
>> Problem Statement :
>>       I want to accept a paramter as command line argument, and pass on
>> to the executors.
>>       (want to use the paramter in rdd.foreach method which is executed
>> on executor)
>>       I understand that when driver is started, only the jar is
>> transported to all the Workers.
>>       But i need to use the dynamically passed command line argument in
>> the reduce operation executed on executors.
>> Code Snippets for better understanding my problem :
>> public class KafkaReconcilationJob {
>> private static Logger logger =
>> Logger.getLogger(KafkaReconcilationJob.class);
>>  public static void main(String[] args) throws Exception {
>>                           CommandLineArguments.CLICK_THRESHOLD =
>> Integer.parseInt(args[12]);
>> -------------------------------------------------------> I want to use this
>> command line argument
>>         }
>> }
>> JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
>> adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
>> @Override
>> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception
>> {
>> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
>> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
>> return true;
>> }else {
>> return false;
>> }
>> }
>> });
>> The above mentioned Filter operation gets executed on executor which has
>> 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD
>> Regards,
>> Gaurav

Reply via email to