The general way passing parameters to spark-submit are as follows (note that I use a generic shell script to submit jobs). Replace ${JAR_FILE} with appropriate values. In general you can pass all these driver-memory, executor-memory to shell script as variables if you wish without hard coding them
${SPARK_HOME}/bin/spark-submit \ --packages com.databricks:spark-csv_2.11:1.3.0 \ --master spark://50.140.197.217:7077 \ --driver-memory 4G \ --num-executors 5 \ --executor-memory 4G \ --executor-cores 2 \ --conf "spark.scheduler.mode=FAIR" \ --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \ --class ${FILE_NAME} \ --conf "spark.ui.port=55556" \ --conf "spark.driver.port=54631" \ --conf "spark.fileserver.port=54731" \ --conf "spark.blockManager.port=54832" \ --conf "spark.kryoserializer.buffer.max=512" \ ${JAR_FILE} \ >> ${LOG_FILE} For twitter stuff pass twitter specific parameters after the ${JAR_FILE} ${SPARK_HOME}/bin/spark-submit \ --packages com.databricks:spark-csv_2.11:1.3.0 \ --driver-memory 2G \ --num-executors 1 \ --executor-memory 2G \ --master local[2] \ --executor-cores 2 \ --conf "spark.scheduler.mode=FAIR" \ --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \ --class "com.databricks.apps.twitter_classifier.${FILE_NAME}" \ --conf "spark.ui.port=55555" \ --conf "spark.driver.port=54631" \ --conf "spark.fileserver.port=54731" \ --conf "spark.blockManager.port=54832" \ --conf "spark.kryoserializer.buffer.max=512" \ ${JAR_FILE} \ ${OUTPUT_DIRECTORY:-/tmp/tweets} \ ${NUM_TWEETS_TO_COLLECT:-10000} \ ${OUTPUT_FILE_INTERVAL_IN_SECS:-10} \ ${OUTPUT_FILE_PARTITIONS_EACH_INTERVAL:-1} \ >> ${LOG_FILE} Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 3 June 2016 at 09:10, Jacek Laskowski <ja...@japila.pl> wrote: > --executor-cores 1 to be exact. > > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> interesting. a vm with one core! >> >> one simple test >> >> can you try running with >> >> --executor-cores=1 >> >> and see it works ok please >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote: >> >>> Mich, thanks for your time, >>> >>> >>> i am launching spark-submit as follows: >>> >>> >>> bin/spark-submit --class com.example.SparkStreamingImpl --master >>> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g >>> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote >>> -Dcom.sun.management.jmxremote.port=8090 >>> -Dcom.sun.management.jmxremote.rmi.port=8091 >>> -Dcom.sun.management.jmxremote.authenticate=false >>> -Dcom.sun.management.jmxremote.ssl=false" --conf >>> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote >>> -Dcom.sun.management.jmxremote.port=8092 >>> -Dcom.sun.management.jmxremote.rmi.port=8093 >>> -Dcom.sun.management.jmxremote.authenticate=false >>> -Dcom.sun.management.jmxremote.ssl=false" --conf >>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar >>> >>> >>> When i use --executor-cores=12 i get "Initial job has not accepted any >>> resources; check your cluster UI to ensure that workers are registered and >>> have sufficient resources". >>> >>> >>> This, because my nodes are single core, but i want to use more than one >>> thread per core, is this possible? >>> >>> >>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu >>> Architecture: x86_64 >>> CPU op-mode(s): 32-bit, 64-bit >>> Byte Order: Little Endian >>> CPU(s): 1 >>> On-line CPU(s) list: 0 >>> Thread(s) per core: 1 >>> Core(s) per socket: 1 >>> Socket(s): 1 >>> NUMA node(s): 1 >>> Vendor ID: GenuineIntel >>> CPU family: 6 >>> Model: 58 >>> Model name: Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz >>> Stepping: 0 >>> CPU MHz: 2999.999 >>> BogoMIPS: 5999.99 >>> Hypervisor vendor: VMware >>> Virtualization type: full >>> L1d cache: 32K >>> L1i cache: 32K >>> L2 cache: 256K >>> L3 cache: 25600K >>> NUMA node0 CPU(s): 0 >>> >>> >>> Thanks >>> >>> >>> >>> ------------------------------ >>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>> *Sent:* Thursday, June 2, 2016 5:00 PM >>> *To:* Andres M Jimenez T >>> *Cc:* user@spark.apache.org >>> *Subject:* Re: how to increase threads per executor >>> >>> What are passing as parameters to Spark-submit? >>> >>> >>> ${SPARK_HOME}/bin/spark-submit \ >>> --executor-cores=12 \ >>> >>> Also check >>> >>> http://spark.apache.org/docs/latest/configuration.html >>> Configuration - Spark 1.6.1 Documentation >>> <http://spark.apache.org/docs/latest/configuration.html> >>> spark.apache.org >>> Spark Configuration. Spark Properties. Dynamically Loading Spark >>> Properties; Viewing Spark Properties; Available Properties. Application >>> Properties; Runtime Environment >>> >>> >>> Execution Behavior/spark.executor.cores >>> >>> >>> HTH >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote: >>> >>>> Hi, >>>> >>>> >>>> I am working with Spark 1.6.1, using kafka direct connect for streaming >>>> data. >>>> >>>> Using spark scheduler and 3 slaves. >>>> >>>> Kafka topic is partitioned with a value of 10. >>>> >>>> >>>> The problem i have is, there is only one thread per executor running my >>>> function (logic implementation). >>>> >>>> >>>> Can anybody tell me how can i increase threads per executor to get >>>> better use of CPUs? >>>> >>>> >>>> Thanks >>>> >>>> >>>> Here is the code i have implemented: >>>> >>>> >>>> *Driver*: >>>> >>>> >>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new >>>> Duration(10000)); >>>> >>>> //prepare streaming from kafka >>>> >>>> Set<String> topicsSet = new >>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(","))); >>>> >>>> Map<String, String> kafkaParams = new HashMap<>(); >>>> >>>> kafkaParams.put("metadata.broker.list", kafkaBrokers); >>>> >>>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName()); >>>> >>>> >>>> JavaPairInputDStream<String, String> inputMessages = >>>> KafkaUtils.createDirectStream( >>>> >>>> ssc, >>>> >>>> String.class, >>>> >>>> String.class, >>>> >>>> StringDecoder.class, >>>> >>>> StringDecoder.class, >>>> >>>> kafkaParams, >>>> >>>> topicsSet >>>> >>>> ); >>>> >>>> >>>> inputMessages.foreachRDD(new ForeachRDDFunction()); >>>> >>>> >>>> *ForeachFunction*: >>>> >>>> >>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> { >>>> >>>> private static final Counter foreachConcurrent = >>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" ); >>>> >>>> public ForeachFunction() { >>>> >>>> LOG.info("Creating a new ForeachFunction"); >>>> >>>> } >>>> >>>> >>>> public void call(Tuple2<String, String> t) throws Exception { >>>> >>>> foreachConcurrent.inc(); >>>> >>>> LOG.info("processing message [" + t._1() + "]"); >>>> >>>> try { >>>> >>>> Thread.sleep(1000); >>>> >>>> } catch (Exception e) { } >>>> >>>> foreachConcurrent.dec(); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> *ForeachRDDFunction*: >>>> >>>> >>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, >>>> String>> { >>>> >>>> private static final Counter foreachRDDConcurrent = >>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" ); >>>> >>>> private ForeachFunction foreachFunction = new ForeachFunction(); >>>> >>>> public ForeachRDDFunction() { >>>> >>>> LOG.info("Creating a new ForeachRDDFunction"); >>>> >>>> } >>>> >>>> >>>> public void call(JavaPairRDD<String, String> t) throws Exception { >>>> >>>> foreachRDDConcurrent.inc(); >>>> >>>> LOG.info("call from inputMessages.foreachRDD with [" + >>>> t.partitions().size() + "] partitions"); >>>> >>>> for (Partition p : t.partitions()) { >>>> >>>> if (p instanceof KafkaRDDPartition){ >>>> >>>> LOG.info("partition [" + p.index() + "] with count [" + >>>> ((KafkaRDDPartition) p).count() + "]"); >>>> >>>> } >>>> >>>> } >>>> >>>> t.foreachAsync(foreachFunction); >>>> >>>> foreachRDDConcurrent.dec(); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> *The log from driver that tells me my RDD is partitioned to process in >>>> parallel*: >>>> >>>> >>>> [Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + >>>> 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from >>>> inputMessages.foreachRDD with [20] partitions >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count >>>> [26] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count >>>> [27] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count >>>> [16] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count >>>> [15] >>>> >>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0] >>>> >>>> >>>> *The log from one of executors showing exactly one message per second >>>> was processed (only by one thread)*: >>>> >>>> >>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message >>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8] >>>> >>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message >>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc] >>>> >>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message >>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3] >>>> >>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message >>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e] >>>> >>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message >>>> [1bc0a141-b910-45fe-9881-e2066928fbc6] >>>> >>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message >>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07] >>>> >>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message >>>> [de7d5934-bab2-4019-917e-c339d864ba18] >>>> >>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message >>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c] >>>> >>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message >>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b] >>>> >>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message >>>> [5367f3c1-d66c-4647-bb44-f5eab719031d] >>>> >>>> >>> >> >