Re: spark streaming 1.3 issues
In spark streaming 1.3 - Say I have 10 executors each with 4 cores so in total 40 tasks in parllel at once. If I repartition kafka directstream to 40 partitions vs say I have in kafka topic 300 partitions - which one will be more efficient , Should I repartition the kafka stream equal to num of cores or keep it same as 300? If I have number of partitions greater than parllel tasks will that not cause overhead of task scheduling ? On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das t...@databricks.com wrote: For Java, do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*). offsetRanges(); If you fix that error, you should be seeing data. You can call arbitrary RDD operations on a DStream, using DStream.transform. Take a look at the docs. For the direct kafka approach you are doing, - tasks do get launched for empty partitions - driver may make multiple calls to Kafka brokers to get all the offset information. But that does not mean you should reduce partitions. the whole point of having large number of partition is the consume the data in parallel. If you reduce the number of partitions, that defeats the purpose of having partitoins at all. And the driver making calls for getting metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually. Rather receiving and processing the actual data is usually the bottleneck and to increase throughput you should have larger number of partitions. On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: spark streaming 1.3 issues
For Java, do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).offsetRanges(); If you fix that error, you should be seeing data. You can call arbitrary RDD operations on a DStream, using DStream.transform. Take a look at the docs. For the direct kafka approach you are doing, - tasks do get launched for empty partitions - driver may make multiple calls to Kafka brokers to get all the offset information. But that does not mean you should reduce partitions. the whole point of having large number of partition is the consume the data in parallel. If you reduce the number of partitions, that defeats the purpose of having partitoins at all. And the driver making calls for getting metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually. Rather receiving and processing the actual data is usually the bottleneck and to increase throughput you should have larger number of partitions. On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: spark streaming 1.3 issues
I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: spark streaming 1.3 issues
Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant