Mich, thanks for your time,
i am launching spark-submit as follows: bin/spark-submit --class com.example.SparkStreamingImpl --master spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8091 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" --conf "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8092 -Dcom.sun.management.jmxremote.rmi.port=8093 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" --conf "spark.scheduler.mode=FAIR" --conf /home/Processing.jar When i use --executor-cores=12 i get "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources". This, because my nodes are single core, but i want to use more than one thread per core, is this possible? root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 1 On-line CPU(s) list: 0 Thread(s) per core: 1 Core(s) per socket: 1 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 58 Model name: Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz Stepping: 0 CPU MHz: 2999.999 BogoMIPS: 5999.99 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 25600K NUMA node0 CPU(s): 0 Thanks ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com> Sent: Thursday, June 2, 2016 5:00 PM To: Andres M Jimenez T Cc: user@spark.apache.org Subject: Re: how to increase threads per executor What are passing as parameters to Spark-submit? ${SPARK_HOME}/bin/spark-submit \ --executor-cores=12 \ Also check http://spark.apache.org/docs/latest/configuration.html Configuration - Spark 1.6.1 Documentation<http://spark.apache.org/docs/latest/configuration.html> spark.apache.org Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; Viewing Spark Properties; Available Properties. Application Properties; Runtime Environment Execution Behavior/spark.executor.cores HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com<mailto:ad...@hotmail.com>> wrote: Hi, I am working with Spark 1.6.1, using kafka direct connect for streaming data. Using spark scheduler and 3 slaves. Kafka topic is partitioned with a value of 10. The problem i have is, there is only one thread per executor running my function (logic implementation). Can anybody tell me how can i increase threads per executor to get better use of CPUs? Thanks Here is the code i have implemented: Driver: JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(10000)); //prepare streaming from kafka Set<String> topicsSet = new HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(","))); Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", kafkaBrokers); kafkaParams.put("group.id<http://group.id>", SparkStreamingImpl.class.getName()); JavaPairInputDStream<String, String> inputMessages = KafkaUtils.createDirectStream( ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); inputMessages.foreachRDD(new ForeachRDDFunction()); ForeachFunction: class ForeachFunction implements VoidFunction<Tuple2<String, String>> { private static final Counter foreachConcurrent = ProcessingMetrics.metrics.counter( "foreach-concurrency" ); public ForeachFunction() { LOG.info("Creating a new ForeachFunction"); } public void call(Tuple2<String, String> t) throws Exception { foreachConcurrent.inc(); LOG.info("processing message [" + t._1() + "]"); try { Thread.sleep(1000); } catch (Exception e) { } foreachConcurrent.dec(); } } ForeachRDDFunction: class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, String>> { private static final Counter foreachRDDConcurrent = ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" ); private ForeachFunction foreachFunction = new ForeachFunction(); public ForeachRDDFunction() { LOG.info("Creating a new ForeachRDDFunction"); } public void call(JavaPairRDD<String, String> t) throws Exception { foreachRDDConcurrent.inc(); LOG.info("call from inputMessages.foreachRDD with [" + t.partitions().size() + "] partitions"); for (Partition p : t.partitions()) { if (p instanceof KafkaRDDPartition){ LOG.info("partition [" + p.index() + "] with count [" + ((KafkaRDDPartition) p).count() + "]"); } } t.foreachAsync(foreachFunction); foreachRDDConcurrent.dec(); } } The log from driver that tells me my RDD is partitioned to process in parallel: [Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from inputMessages.foreachRDD with [20] partitions 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15] 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0] The log from one of executors showing exactly one message per second was processed (only by one thread): 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8] 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message [e267cde2-ffea-4f7a-9934-f32a3b7218cc] 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message [f055fe3c-0f72-4f41-9a31-df544f1e1cd3] 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message [854faaa5-0abe-49a2-b13a-c290a3720b0e] 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message [1bc0a141-b910-45fe-9881-e2066928fbc6] 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message [67fb99c6-1ca1-4dfb-bffe-43b927fdec07] 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message [de7d5934-bab2-4019-917e-c339d864ba18] 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message [e63d7a7e-de32-4527-b8f1-641cfcc8869c] 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message [1ce931ee-b8b1-4645-8a51-2c697bf1513b] 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message [5367f3c1-d66c-4647-bb44-f5eab719031d]