The general way passing parameters to spark-submit are as follows (note
that I use a generic shell script to submit jobs). Replace ${JAR_FILE} with
appropriate values. In general you can pass all these driver-memory,
executor-memory to shell script as variables if you wish without hard
coding them

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --master spark:// \
                --driver-memory 4G \
                --num-executors 5 \
                --executor-memory 4G \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
                --class ${FILE_NAME} \
                --conf "spark.ui.port=55556" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                >> ${LOG_FILE}

For twitter stuff pass twitter specific parameters after the ${JAR_FILE}

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --driver-memory 2G \
                --num-executors 1 \
                --executor-memory 2G \
                --master local[2] \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
"com.databricks.apps.twitter_classifier.${FILE_NAME}" \
                --conf "spark.ui.port=55555" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                ${OUTPUT_DIRECTORY:-/tmp/tweets} \
                ${NUM_TWEETS_TO_COLLECT:-10000} \
                ${OUTPUT_FILE_INTERVAL_IN_SECS:-10} \
                >> ${LOG_FILE}

Dr Mich Talebzadeh

LinkedIn *

On 3 June 2016 at 09:10, Jacek Laskowski <> wrote:

> --executor-cores 1 to be exact.
> Pozdrawiam,
> Jacek Laskowski
> ----
> Mastering Apache Spark
> Follow me at
> On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <
>> wrote:
>> interesting. a vm with one core!
>> one simple test
>> can you try running with
>> --executor-cores=1
>> and see it works ok please
>> Dr Mich Talebzadeh
>> LinkedIn * 
>> <>*
>> On 2 June 2016 at 23:15, Andres M Jimenez T <> wrote:
>>> Mich, thanks for your time,
>>> i am launching spark-submit as follows:
>>> bin/spark-submit --class com.example.SparkStreamingImpl --master
>>> spark:// --verbose --driver-memory 1g --executor-memory 1g
>>> --conf "
>>>" --conf
>>> "
>>>" --conf
>>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>>> When i use --executor-cores=12 i get "Initial job has not accepted any
>>> resources; check your cluster UI to ensure that workers are registered and
>>> have sufficient resources".
>>> This, because my nodes are single core, but i want to use more than one
>>> thread per core, is this possible?
>>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
>>> Architecture:          x86_64
>>> CPU op-mode(s):        32-bit, 64-bit
>>> Byte Order:            Little Endian
>>> CPU(s):                1
>>> On-line CPU(s) list:   0
>>> Thread(s) per core:    1
>>> Core(s) per socket:    1
>>> Socket(s):             1
>>> NUMA node(s):          1
>>> Vendor ID:             GenuineIntel
>>> CPU family:            6
>>> Model:                 58
>>> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
>>> Stepping:              0
>>> CPU MHz:               2999.999
>>> BogoMIPS:              5999.99
>>> Hypervisor vendor:     VMware
>>> Virtualization type:   full
>>> L1d cache:             32K
>>> L1i cache:             32K
>>> L2 cache:              256K
>>> L3 cache:              25600K
>>> NUMA node0 CPU(s):     0
>>> Thanks
>>> ------------------------------
>>> *From:* Mich Talebzadeh <>
>>> *Sent:* Thursday, June 2, 2016 5:00 PM
>>> *To:* Andres M Jimenez T
>>> *Cc:*
>>> *Subject:* Re: how to increase threads per executor
>>> What are passing as parameters to Spark-submit?
>>> ${SPARK_HOME}/bin/spark-submit \
>>>                 --executor-cores=12 \
>>> Also check
>>> Configuration - Spark 1.6.1 Documentation
>>> <>
>>> Spark Configuration. Spark Properties. Dynamically Loading Spark
>>> Properties; Viewing Spark Properties; Available Properties. Application
>>> Properties; Runtime Environment
>>> Execution Behavior/spark.executor.cores
>>> HTH
>>> Dr Mich Talebzadeh
>>> LinkedIn * 
>>> <>*
>>> On 2 June 2016 at 17:29, Andres M Jimenez T <> wrote:
>>>> Hi,
>>>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>>>> data.
>>>> Using spark scheduler and 3 slaves.
>>>> Kafka topic is partitioned with a value of 10.
>>>> The problem i have is, there is only one thread per executor running my
>>>> function (logic implementation).
>>>> Can anybody tell me how can i increase threads per executor to get
>>>> better use of CPUs?
>>>> Thanks
>>>> Here is the code i have implemented:
>>>> *Driver*:
>>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>>>> Duration(10000));
>>>> //prepare streaming from kafka
>>>> Set<String> topicsSet = new
>>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>>> Map<String, String> kafkaParams = new HashMap<>();
>>>> kafkaParams.put("", kafkaBrokers);
>>>> kafkaParams.put("", SparkStreamingImpl.class.getName());
>>>> JavaPairInputDStream<String, String> inputMessages =
>>>> KafkaUtils.createDirectStream(
>>>> ssc,
>>>> String.class,
>>>> String.class,
>>>> StringDecoder.class,
>>>> StringDecoder.class,
>>>> kafkaParams,
>>>> topicsSet
>>>> );
>>>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>>> *ForeachFunction*:
>>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>>>> private static final Counter foreachConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>>> public ForeachFunction() {
>>>>"Creating a new ForeachFunction");
>>>> }
>>>> public void call(Tuple2<String, String> t) throws Exception {
>>>>"processing message [" + t._1() + "]");
>>>> try {
>>>> Thread.sleep(1000);
>>>> } catch (Exception e) { }
>>>> foreachConcurrent.dec();
>>>> }
>>>> }
>>>> *ForeachRDDFunction*:
>>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>>>> String>> {
>>>> private static final Counter foreachRDDConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>>> private ForeachFunction foreachFunction = new ForeachFunction();
>>>> public ForeachRDDFunction() {
>>>>"Creating a new ForeachRDDFunction");
>>>> }
>>>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>>>"call from inputMessages.foreachRDD with [" +
>>>> t.partitions().size() + "] partitions");
>>>> for (Partition p : t.partitions()) {
>>>> if (p instanceof KafkaRDDPartition){
>>>>"partition [" + p.index() + "] with count [" +
>>>> ((KafkaRDDPartition) p).count() + "]");
>>>> }
>>>> }
>>>> t.foreachAsync(foreachFunction);
>>>> foreachRDDConcurrent.dec();
>>>> }
>>>> }
>>>> *The log from driver that tells me my RDD is partitioned to process in
>>>> parallel*:
>>>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 +
>>>> 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>>>> inputMessages.foreachRDD with [20] partitions
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count
>>>> [26]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count
>>>> [27]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count
>>>> [16]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count
>>>> [15]
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>>> *The log from one of executors showing exactly one message per second
>>>> was processed (only by one thread)*:
>>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>>>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>>>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>>>> [5367f3c1-d66c-4647-bb44-f5eab719031d]

Reply via email to