You have access to the offset ranges for a given rdd in the stream by
typecasting to HasOffsetRanges.  You can then store the offsets wherever
you need to.

On Tue, Jul 14, 2015 at 5:00 PM, Chen Song <chen.song...@gmail.com> wrote:

> A follow up question.
>
> When using createDirectStream approach, the offsets are checkpointed to
> HDFS and it is understandable by Spark Streaming job. Is there a way to
> expose the offsets via a REST api to end users. Or alternatively, is there
> a way to have offsets committed to Kafka Offset Manager so users can query
> from a consumer programmatically?
>
> Essentially, all I need to do is monitor the progress of data consumption
> of the Kafka topic.
>
>
> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You can't use different versions of spark in your application vs your
>> cluster.
>>
>> For the direct stream, it's not 60 partitions per executor, it's 300
>> partitions, and executors work on them as they are scheduled.  Yes, if you
>> have no messages you will get an empty partition.  It's up to you whether
>> it's worthwhile to call coalesce or not.
>>
>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>>> partitions.
>>> 3 is just parallelism on same receiver and recommendation is to use 1
>>> per receiver since consuming from kafka is not cpu bound rather NIC(network
>>> bound)  increasing consumer thread on one receiver won't make it parallel
>>> in ideal sense ?
>>>
>>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>>> partitions per executor (total 300 one to one mapping) and if some of kafka
>>> partitions are empty say offset of last checkpoint to current is same for
>>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>>> should call coalesce on kafkaRDD ?
>>>
>>>
>>> And is there any incompatibity issue when I include spark-streaming_2.10
>>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>>> cluster has spark version 1.2 ?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> 1. Here you are basically creating 2 receivers and asking each of them
>>>> to consume 3 kafka partitions each.
>>>>
>>>> - In 1.2 we have high level consumers so how can we restrict no of
>>>> kafka partitions to consume from? Say I have 300 kafka partitions in kafka
>>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
>>>> it mean I will read from 6 out of 300 partitions only and for rest 294
>>>> partitions data is lost?
>>>>
>>>>
>>>> 2.One more doubt in spark streaming how is it decided which part of
>>>> main function of driver will run at each batch interval ? Since whole code
>>>> is written in one function(main function in driver) so how it determined
>>>> kafka streams receivers  not to be registered in each batch only processing
>>>> to be done .
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Let me take ashot at your questions. (I am sure people like Cody and
>>>>> TD will correct if I am wrong)
>>>>>
>>>>> 0. This is exact copy from the similar question in mail thread from
>>>>> Akhil D:
>>>>> Since you set local[4] you will have 4 threads for your computation,
>>>>> and since you are having 2 receivers, you are left with 2 threads to
>>>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>>>>> means you are having 2 tasks in that stage (with id 0).
>>>>>
>>>>> 1. Here you are basically creating 2 receivers and asking each of them
>>>>> to consume 3 kafka partitions each.
>>>>> 2. How does that matter? It depends on how many receivers you have
>>>>> created to consume that data and if you have repartitioned it. Remember,
>>>>> spark is lazy and executors are relted to the context
>>>>> 3. I think in java, factory method is fixed. You just pass around the
>>>>> contextFactory object. (I love python :) see the signature isso much
>>>>> cleaner :) )
>>>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>>>>> pointing too.
>>>>>
>>>>> Best
>>>>> Ayan
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> Few doubts :
>>>>>>
>>>>>> In 1.2 streaming when I use union of streams , my streaming
>>>>>> application getting hanged sometimes and nothing gets printed on driver.
>>>>>>
>>>>>>
>>>>>> [Stage 2:>
>>>>>>
>>>>>>                                         (0 + 2) / 2]
>>>>>>  Whats is 0+2/2 here signifies.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>>>>>> same as numstreams=2 ? in unioned stream ?
>>>>>>
>>>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>>>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at 
>>>>>> start
>>>>>> of app throughout its lifetime . Does executors gets allicated at start 
>>>>>> of
>>>>>> each job on 1s batch interval? If yes, how does its fast to allocate
>>>>>> resources. I mean if i increase num-executors to 50 , it will negotiate 
>>>>>> 50
>>>>>> executors from yarnRM at start of each job so does it takes more time in
>>>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>>>>>> fixed processing executors also throughout the app?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> SparkConf conf = new
>>>>>> SparkConf().setAppName("SampleSparkStreamingApp");
>>>>>> JavaStreamingContext jssc = new
>>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>>>>
>>>>>> Map<String,String> kafkaParams = new HashMap<String, String>();
>>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>>>>> kafkaParams.put("group.id", "testgroup");
>>>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>>>>>>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
>>>>>> topicsMap.put("testSparkPartitioned", 3);
>>>>>> int numStreams = 2;
>>>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new
>>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>();
>>>>>>   for(int i=0;i<numStreams;i++){
>>>>>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>>>>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>>>>> }
>>>>>>  JavaPairDStream<byte[], byte[]> directKafkaStream =
>>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>>>>> kafkaStreams.size()));
>>>>>>  JavaDStream<String> lines = directKafkaStream.map(new
>>>>>> Function<Tuple2<byte[],byte[]>, String>() {
>>>>>>
>>>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception {
>>>>>> ...processing
>>>>>> ..return msg;
>>>>>> }
>>>>>> });
>>>>>> lines.print();
>>>>>> jssc.start();
>>>>>> jssc.awaitTermination();
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> 3.For avoiding dataloss when we use checkpointing, and factory method
>>>>>> to create sparkConytext, is method name fixed
>>>>>> or we can use any name and how to set in app the method name to be
>>>>>> used ?
>>>>>>
>>>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>>>>>> zookeeper, is it because of zookeeper is not efficient for high writes 
>>>>>> and
>>>>>> read is not strictly consistent? So
>>>>>>
>>>>>>  we use simple Kafka API that does not use Zookeeper and offsets
>>>>>> tracked only by Spark Streaming within its checkpoints. This
>>>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, 
>>>>>> and
>>>>>> so each record is received by Spark Streaming effectively exactly once
>>>>>> despite failures.
>>>>>>
>>>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>>>>>> checkoint location ? Means does hdfs be used for small data(just offset?)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> There is another option to try for Receiver Based Low Level Kafka
>>>>>>> Consumer which is part of Spark-Packages (
>>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) .
>>>>>>> This can be used with WAL as well for end to end zero data loss.
>>>>>>>
>>>>>>> This is also Reliable Receiver and Commit offset to ZK.  Given the
>>>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka 
>>>>>>> API
>>>>>>> for Receiver based approach may leads to issues related Consumer
>>>>>>> Re-balancing  which is a major issue of Kafka High Level API.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dibyendu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In the receiver based approach, If the receiver crashes for any
>>>>>>>> reason (receiver crashed or executor crashed) the receiver should get
>>>>>>>> restarted on another executor and should start reading data from the 
>>>>>>>> offset
>>>>>>>> present in the zookeeper. There is some chance of data loss which can
>>>>>>>> alleviated using Write Ahead Logs (see streaming programming guide for 
>>>>>>>> more
>>>>>>>> details, or see my talk [Slides PDF
>>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>>>>>>> , Video
>>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>>>>>>> ] from last Spark Summit 2015). But that approach can give
>>>>>>>> duplicate records. The direct approach gives exactly-once guarantees, 
>>>>>>>> so
>>>>>>>> you should try it out.
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Read the spark streaming guide ad the kafka integration guide for
>>>>>>>>> a better understanding of how the receiver based stream works.
>>>>>>>>>
>>>>>>>>> Capacity planning is specific to your environment and what the job
>>>>>>>>> is actually doing, youll need to determine it empirically.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Friday, June 26, 2015, Shushant Arora <
>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> In 1.2 how to handle offset management after stream application
>>>>>>>>>> starts in each job . I should commit offset after job completion 
>>>>>>>>>> manually?
>>>>>>>>>>
>>>>>>>>>> And what is recommended no of consumer threads. Say I have 300
>>>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per 
>>>>>>>>>> second.Each
>>>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each 
>>>>>>>>>> receiver
>>>>>>>>>> is sufficient for spark streaming to consume ?
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <
>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses
>>>>>>>>>>> zookeeper to store offsets.  If you want finer-grained control over
>>>>>>>>>>> offsets, you can update the values in zookeeper yourself before 
>>>>>>>>>>> starting
>>>>>>>>>>> the job.
>>>>>>>>>>>
>>>>>>>>>>> createDirectStream in spark 1.3 is still marked as experimental,
>>>>>>>>>>> and subject to change.  That being said, it works better for me in
>>>>>>>>>>> production than the receiver based api.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I am using spark streaming 1.2.
>>>>>>>>>>>>
>>>>>>>>>>>> If processing executors get crashed will receiver rest the
>>>>>>>>>>>> offset back to last processed offset?
>>>>>>>>>>>>
>>>>>>>>>>>> If receiver itself got crashed is there a way to reset the
>>>>>>>>>>>> offset without restarting streaming application other than 
>>>>>>>>>>>> smallest or
>>>>>>>>>>>> largest.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is spark streaming 1.3  which uses low level consumer api,
>>>>>>>>>>>> stabe? And which is recommended for handling data  loss 1.2 or 1.3 
>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Chen Song
>
>

Reply via email to