Hi, I am using Kafka Spark cluster for real time aggregation analytics use case in production.
Cluster details 6 nodes, each node running 1 Spark and kafka processes each. Node1 -> 1 Master , 1 Worker, 1 Driver, 1 Kafka process Node 2,3,4,5,6 -> 1 Worker prcocess each 1 Kafka process each Spark version 1.3.0 Kafka Veriosn 0.8.1 I am using Kafka Directstream for Kafka Spark Integration. Analytics code is written in using Spark Java API. Problem Statement : I want to accept a paramter as command line argument, and pass on to the executors. (want to use the paramter in rdd.foreach method which is executed on executor) I understand that when driver is started, only the jar is transported to all the Workers. But i need to use the dynamically passed command line argument in the reduce operation executed on executors. Code Snippets for better understanding my problem : public class KafkaReconcilationJob { private static Logger logger = Logger.getLogger(KafkaReconcilationJob.class); public static void main(String[] args) throws Exception { CommandLineArguments.CLICK_THRESHOLD = Integer.parseInt(args[12]); -------------------------------------------------------> I want to use this command line argument } } JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData = adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() { @Override public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception { if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD || adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){ return true; }else { return false; } } }); The above mentioned Filter operation gets executed on executor which has 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD Regards, Gaurav