Thanks Todd, that solved my problem.

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Jun 11, 2015 6:42 PM, "Todd Nist" <tsind...@gmail.com> wrote:

> Hi Gaurav,
>
> Seems like you could use a broadcast variable for this if I understand
> your use case.  Create it in the driver based on the CommandLineArguments
> and then use it in the workers.
>
>
> https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>
> So something like:
>
> Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));
>
> Then just reference the broadcast variable in you workers.  It will get
> shipped once to all nodes in the cluster and can be referenced by them.
>
> HTH.
>
> -Todd
>
> On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <sharmagaura...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using Kafka Spark cluster for real time aggregation analytics use
>> case in production.
>>
>> Cluster details
>> 6 nodes, each node running 1 Spark and kafka processes each.
>> Node1              -> 1 Master , 1 Worker, 1 Driver,
>>      1 Kafka process
>> Node 2,3,4,5,6 -> 1 Worker prcocess each
>>  1 Kafka process each
>>
>> Spark version 1.3.0
>> Kafka Veriosn 0.8.1
>>
>> I am using Kafka Directstream for Kafka Spark Integration.
>> Analytics code is written in using Spark Java API.
>>
>> Problem Statement :
>>
>>       I want to accept a paramter as command line argument, and pass on
>> to the executors.
>>       (want to use the paramter in rdd.foreach method which is executed
>> on executor)
>>
>>       I understand that when driver is started, only the jar is
>> transported to all the Workers.
>>       But i need to use the dynamically passed command line argument in
>> the reduce operation executed on executors.
>>
>>
>> Code Snippets for better understanding my problem :
>>
>> public class KafkaReconcilationJob {
>>
>> private static Logger logger =
>> Logger.getLogger(KafkaReconcilationJob.class);
>>  public static void main(String[] args) throws Exception {
>>                           CommandLineArguments.CLICK_THRESHOLD =
>> Integer.parseInt(args[12]);
>> -------------------------------------------------------> I want to use this
>> command line argument
>>         }
>>
>> }
>>
>>
>>
>> JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
>> adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
>> @Override
>> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception
>> {
>> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
>> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
>> return true;
>> }else {
>> return false;
>> }
>> }
>> });
>>
>>
>>
>> The above mentioned Filter operation gets executed on executor which has
>> 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD
>>
>>
>> Regards,
>> Gaurav
>>
>
>

Reply via email to