interesting. a vm with one core! one simple test
can you try running with --executor-cores=1 and see it works ok please Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote: > Mich, thanks for your time, > > > i am launching spark-submit as follows: > > > bin/spark-submit --class com.example.SparkStreamingImpl --master > spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g > --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.port=8090 > -Dcom.sun.management.jmxremote.rmi.port=8091 > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false" --conf > "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.port=8092 > -Dcom.sun.management.jmxremote.rmi.port=8093 > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false" --conf > "spark.scheduler.mode=FAIR" --conf /home/Processing.jar > > > When i use --executor-cores=12 i get "Initial job has not accepted any > resources; check your cluster UI to ensure that workers are registered and > have sufficient resources". > > > This, because my nodes are single core, but i want to use more than one > thread per core, is this possible? > > > root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu > Architecture: x86_64 > CPU op-mode(s): 32-bit, 64-bit > Byte Order: Little Endian > CPU(s): 1 > On-line CPU(s) list: 0 > Thread(s) per core: 1 > Core(s) per socket: 1 > Socket(s): 1 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family: 6 > Model: 58 > Model name: Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz > Stepping: 0 > CPU MHz: 2999.999 > BogoMIPS: 5999.99 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 25600K > NUMA node0 CPU(s): 0 > > > Thanks > > > > ------------------------------ > *From:* Mich Talebzadeh <mich.talebza...@gmail.com> > *Sent:* Thursday, June 2, 2016 5:00 PM > *To:* Andres M Jimenez T > *Cc:* user@spark.apache.org > *Subject:* Re: how to increase threads per executor > > What are passing as parameters to Spark-submit? > > > ${SPARK_HOME}/bin/spark-submit \ > --executor-cores=12 \ > > Also check > > http://spark.apache.org/docs/latest/configuration.html > Configuration - Spark 1.6.1 Documentation > <http://spark.apache.org/docs/latest/configuration.html> > spark.apache.org > Spark Configuration. Spark Properties. Dynamically Loading Spark > Properties; Viewing Spark Properties; Available Properties. Application > Properties; Runtime Environment > > > Execution Behavior/spark.executor.cores > > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote: > >> Hi, >> >> >> I am working with Spark 1.6.1, using kafka direct connect for streaming >> data. >> >> Using spark scheduler and 3 slaves. >> >> Kafka topic is partitioned with a value of 10. >> >> >> The problem i have is, there is only one thread per executor running my >> function (logic implementation). >> >> >> Can anybody tell me how can i increase threads per executor to get better >> use of CPUs? >> >> >> Thanks >> >> >> Here is the code i have implemented: >> >> >> *Driver*: >> >> >> JavaStreamingContext ssc = new JavaStreamingContext(conf, new >> Duration(10000)); >> >> //prepare streaming from kafka >> >> Set<String> topicsSet = new >> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(","))); >> >> Map<String, String> kafkaParams = new HashMap<>(); >> >> kafkaParams.put("metadata.broker.list", kafkaBrokers); >> >> kafkaParams.put("group.id", SparkStreamingImpl.class.getName()); >> >> >> JavaPairInputDStream<String, String> inputMessages = >> KafkaUtils.createDirectStream( >> >> ssc, >> >> String.class, >> >> String.class, >> >> StringDecoder.class, >> >> StringDecoder.class, >> >> kafkaParams, >> >> topicsSet >> >> ); >> >> >> inputMessages.foreachRDD(new ForeachRDDFunction()); >> >> >> *ForeachFunction*: >> >> >> class ForeachFunction implements VoidFunction<Tuple2<String, String>> { >> >> private static final Counter foreachConcurrent = >> ProcessingMetrics.metrics.counter( "foreach-concurrency" ); >> >> public ForeachFunction() { >> >> LOG.info("Creating a new ForeachFunction"); >> >> } >> >> >> public void call(Tuple2<String, String> t) throws Exception { >> >> foreachConcurrent.inc(); >> >> LOG.info("processing message [" + t._1() + "]"); >> >> try { >> >> Thread.sleep(1000); >> >> } catch (Exception e) { } >> >> foreachConcurrent.dec(); >> >> } >> >> } >> >> >> *ForeachRDDFunction*: >> >> >> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, >> String>> { >> >> private static final Counter foreachRDDConcurrent = >> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" ); >> >> private ForeachFunction foreachFunction = new ForeachFunction(); >> >> public ForeachRDDFunction() { >> >> LOG.info("Creating a new ForeachRDDFunction"); >> >> } >> >> >> public void call(JavaPairRDD<String, String> t) throws Exception { >> >> foreachRDDConcurrent.inc(); >> >> LOG.info("call from inputMessages.foreachRDD with [" + >> t.partitions().size() + "] partitions"); >> >> for (Partition p : t.partitions()) { >> >> if (p instanceof KafkaRDDPartition){ >> >> LOG.info("partition [" + p.index() + "] with count [" + >> ((KafkaRDDPartition) p).count() + "]"); >> >> } >> >> } >> >> t.foreachAsync(foreachFunction); >> >> foreachRDDConcurrent.dec(); >> >> } >> >> } >> >> >> *The log from driver that tells me my RDD is partitioned to process in >> parallel*: >> >> >> [Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + 0) >> / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from >> inputMessages.foreachRDD with [20] partitions >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15] >> >> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0] >> >> >> *The log from one of executors showing exactly one message per second was >> processed (only by one thread)*: >> >> >> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message >> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8] >> >> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message >> [e267cde2-ffea-4f7a-9934-f32a3b7218cc] >> >> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message >> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3] >> >> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message >> [854faaa5-0abe-49a2-b13a-c290a3720b0e] >> >> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message >> [1bc0a141-b910-45fe-9881-e2066928fbc6] >> >> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message >> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07] >> >> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message >> [de7d5934-bab2-4019-917e-c339d864ba18] >> >> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message >> [e63d7a7e-de32-4527-b8f1-641cfcc8869c] >> >> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message >> [1ce931ee-b8b1-4645-8a51-2c697bf1513b] >> >> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message >> [5367f3c1-d66c-4647-bb44-f5eab719031d] >> >> >