For Java, do

OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).offsetRanges();

If you fix that error, you should be seeing data.

You can call arbitrary RDD operations on a DStream, using
DStream.transform. Take a look at the docs.

For the direct kafka approach you are doing,
- tasks do get launched for empty partitions
- driver may make multiple calls to Kafka brokers to get all the offset
information. But that does not mean you should reduce partitions. the whole
point of having large number of partition is the consume the data in
parallel. If you reduce the number of partitions, that defeats the purpose
of having partitoins at all. And the driver making calls for getting
metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
Rather receiving and processing the actual data is usually the bottleneck
and to increase throughput you should have larger number of partitions.



On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> I'd suggest you upgrading to 1.4 as it has better metrices and UI.
>
> Thanks
> Best Regards
>
> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Is coalesce not applicable to kafkaStream ? How to do coalesce on
>> kafkadirectstream its not there in api ?
>> Shall calling repartition on directstream with number of executors as
>> numpartitions will imrove perfromance ?
>>
>> Does in 1.3 tasks get launched for partitions which are empty? Does
>> driver makes call for getting offsets of each partition separately or in
>> single call it gets all partitions new offsets ? I mean will reducing no of
>>  partitions oin kafka help improving the performance?
>>
>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> 1.I am using spark streaming 1.3 for reading from a kafka queue and
>>> pushing events to external source.
>>>
>>> I passed in my job 20 executors but it is showing only 6 in executor tab
>>> ?
>>> When I used highlevel streaming 1.2 - its showing 20 executors. My
>>> cluster is 10 node yarn cluster with each node has 8 cores.
>>>
>>> I am calling the script as :
>>>
>>> spark-submit --class classname --num-executors 10 --executor-cores 2
>>> --master yarn-client jarfile
>>>
>>> 2. On Streaming UI
>>>
>>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
>>> Time since start: 13 minutes 28 seconds
>>> Network receivers: 0
>>> Batch interval: 1 second
>>> Processed batches: 807
>>> Waiting batches: 0
>>> Received records: 0
>>> Processed records: 0
>>>
>>> Received records and processed records are always 0 . And Speed of
>>> processing is slow compare to highlevel api.
>>>
>>> I am procesing the stream using mapPartition.
>>>
>>> When I used
>>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
>>> Void>() {
>>>  @Override
>>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
>>> // TODO Auto-generated method stub
>>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
>>> }
>>> }
>>>
>>> It throws an exception
>>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
>>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>>
>>> Thanks
>>> Shushant
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to