Hi,
I am working with Spark 1.6.1, using kafka direct connect for streaming data.
Using spark scheduler and 3 slaves.
Kafka topic is partitioned with a value of 10.
The problem i have is, there is only one thread per executor running my
function (logic implementation).
Can anybody tell me how can i increase threads per executor to get better use
of CPUs?
Thanks
Here is the code i have implemented:
Driver:
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(10000));
//prepare streaming from kafka
Set<String> topicsSet = new
HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kafkaBrokers);
kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
JavaPairInputDStream<String, String> inputMessages =
KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
inputMessages.foreachRDD(new ForeachRDDFunction());
ForeachFunction:
class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
private static final Counter foreachConcurrent =
ProcessingMetrics.metrics.counter( "foreach-concurrency" );
public ForeachFunction() {
LOG.info("Creating a new ForeachFunction");
}
public void call(Tuple2<String, String> t) throws Exception {
foreachConcurrent.inc();
LOG.info("processing message [" + t._1() + "]");
try {
Thread.sleep(1000);
} catch (Exception e) { }
foreachConcurrent.dec();
}
}
ForeachRDDFunction:
class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, String>> {
private static final Counter foreachRDDConcurrent =
ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
private ForeachFunction foreachFunction = new ForeachFunction();
public ForeachRDDFunction() {
LOG.info("Creating a new ForeachRDDFunction");
}
public void call(JavaPairRDD<String, String> t) throws Exception {
foreachRDDConcurrent.inc();
LOG.info("call from inputMessages.foreachRDD with [" + t.partitions().size() +
"] partitions");
for (Partition p : t.partitions()) {
if (p instanceof KafkaRDDPartition){
LOG.info("partition [" + p.index() + "] with count [" + ((KafkaRDDPartition)
p).count() + "]");
}
}
t.foreachAsync(foreachFunction);
foreachRDDConcurrent.dec();
}
}
The log from driver that tells me my RDD is partitioned to process in parallel:
[Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + 0) /
20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
inputMessages.foreachRDD with [20] partitions
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
The log from one of executors showing exactly one message per second was
processed (only by one thread):
16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
[f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
[e267cde2-ffea-4f7a-9934-f32a3b7218cc]
16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
[f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
[854faaa5-0abe-49a2-b13a-c290a3720b0e]
16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
[1bc0a141-b910-45fe-9881-e2066928fbc6]
16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
[67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
[de7d5934-bab2-4019-917e-c339d864ba18]
16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
[e63d7a7e-de32-4527-b8f1-641cfcc8869c]
16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
[1ce931ee-b8b1-4645-8a51-2c697bf1513b]
16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
[5367f3c1-d66c-4647-bb44-f5eab719031d]