Mich, thanks for your time,

i am launching spark-submit as follows:

bin/spark-submit --class com.example.SparkStreamingImpl --master 
spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g --conf 
-Dcom.sun.management.jmxremote.ssl=false" --conf 
-Dcom.sun.management.jmxremote.ssl=false" --conf "spark.scheduler.mode=FAIR" 
--conf /home/Processing.jar

When i use --executor-cores=12 i get "Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources".

This, because my nodes are single core, but i want to use more than one thread 
per core, is this possible?

root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                1
On-line CPU(s) list:   0
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 58
Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
Stepping:              0
CPU MHz:               2999.999
BogoMIPS:              5999.99
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0


I am working with Spark 1.6.1, using kafka direct connect for streaming data.

Using spark scheduler and 3 slaves.

Kafka topic is partitioned with a value of 10.

The problem i have is, there is only one thread per executor running my 
function (logic implementation).

Can anybody tell me how can i increase threads per executor to get better use 
of CPUs?


Here is the code i have implemented:


JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(10000));

//prepare streaming from kafka

Set<String> topicsSet = new 

Map<String, String> kafkaParams = new HashMap<>();

kafkaParams.put("metadata.broker.list", kafkaBrokers);


JavaPairInputDStream<String, String> inputMessages = 









inputMessages.foreachRDD(new ForeachRDDFunction());


class ForeachFunction implements VoidFunction<Tuple2<String, String>> {

private static final Counter foreachConcurrent = 
ProcessingMetrics.metrics.counter( "foreach-concurrency" );

public ForeachFunction() {

LOG.info("Creating a new ForeachFunction");


public void call(Tuple2<String, String> t) throws Exception {


LOG.info("processing message [" + t._1() + "]");

try {


} catch (Exception e) { }





class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, String>> {

private static final Counter foreachRDDConcurrent = 
ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );

private ForeachFunction foreachFunction = new ForeachFunction();

public ForeachRDDFunction() {

LOG.info("Creating a new ForeachRDDFunction");


public void call(JavaPairRDD<String, String> t) throws Exception {


LOG.info("call from inputMessages.foreachRDD with [" + t.partitions().size() + 
"] partitions");

for (Partition p : t.partitions()) {

if (p instanceof KafkaRDDPartition){

LOG.info("partition [" + p.index() + "] with count [" + ((KafkaRDDPartition) 
p).count() + "]");







The log from driver that tells me my RDD is partitioned to process in parallel:

[Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0) / 
20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from 
inputMessages.foreachRDD with [20] partitions

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]

The log from one of executors showing exactly one message per second was 
processed (only by one thread):

16/06/02 08:32:46 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:47 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:48 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:49 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:50 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:51 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:52 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:53 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:54 INFO SparkStreamingImpl: processing message 

16/06/02 08:32:55 INFO SparkStreamingImpl: processing message 

