Hi Gaurav,

Seems like you could use a broadcast variable for this if I understand your
use case.  Create it in the driver based on the CommandLineArguments and
then use it in the workers.

https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

So something like:

Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));

Then just reference the broadcast variable in you workers.  It will get
shipped once to all nodes in the cluster and can be referenced by them.

HTH.

-Todd

On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <sharmagaura...@gmail.com>
wrote:

> Hi,
>
> I am using Kafka Spark cluster for real time aggregation analytics use
> case in production.
>
> Cluster details
> 6 nodes, each node running 1 Spark and kafka processes each.
> Node1              -> 1 Master , 1 Worker, 1 Driver,
>    1 Kafka process
> Node 2,3,4,5,6 -> 1 Worker prcocess each
>  1 Kafka process each
>
> Spark version 1.3.0
> Kafka Veriosn 0.8.1
>
> I am using Kafka Directstream for Kafka Spark Integration.
> Analytics code is written in using Spark Java API.
>
> Problem Statement :
>
>       I want to accept a paramter as command line argument, and pass on to
> the executors.
>       (want to use the paramter in rdd.foreach method which is executed on
> executor)
>
>       I understand that when driver is started, only the jar is
> transported to all the Workers.
>       But i need to use the dynamically passed command line argument in
> the reduce operation executed on executors.
>
>
> Code Snippets for better understanding my problem :
>
> public class KafkaReconcilationJob {
>
> private static Logger logger =
> Logger.getLogger(KafkaReconcilationJob.class);
>  public static void main(String[] args) throws Exception {
>                           CommandLineArguments.CLICK_THRESHOLD =
> Integer.parseInt(args[12]);
> -------------------------------------------------------> I want to use this
> command line argument
>         }
>
> }
>
>
>
> JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
> adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
> @Override
> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
> return true;
> }else {
> return false;
> }
> }
> });
>
>
>
> The above mentioned Filter operation gets executed on executor which has 0
> as the value of the static field CommandLineArguments.CLICK_THRESHOLD
>
>
> Regards,
> Gaurav
>

Reply via email to