Hi,

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

Cluster details
6 nodes, each node running 1 Spark and kafka processes each.
Node1              -> 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each                                  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using Kafka Directstream for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

Problem Statement :

      I want to accept a paramter as command line argument, and pass on to
the executors.
      (want to use the paramter in rdd.foreach method which is executed on
executor)

      I understand that when driver is started, only the jar is transported
to all the Workers.
      But i need to use the dynamically passed command line argument in the
reduce operation executed on executors.


Code Snippets for better understanding my problem :

public class KafkaReconcilationJob {

private static Logger logger =
Logger.getLogger(KafkaReconcilationJob.class);
 public static void main(String[] args) throws Exception {
                          CommandLineArguments.CLICK_THRESHOLD =
Integer.parseInt(args[12]);
-------------------------------------------------------> I want to use this
command line argument
        }

}



JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
@Override
public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
return true;
}else {
return false;
}
}
});



The above mentioned Filter operation gets executed on executor which has 0
as the value of the static field CommandLineArguments.CLICK_THRESHOLD


Regards,
Gaurav

Reply via email to