A follow up question.

When using createDirectStream approach, the offsets are checkpointed to
HDFS and it is understandable by Spark Streaming job. Is there a way to
expose the offsets via a REST api to end users. Or alternatively, is there
a way to have offsets committed to Kafka Offset Manager so users can query
from a consumer programmatically?

Essentially, all I need to do is monitor the progress of data consumption
of the Kafka topic.


On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org> wrote:

> You can't use different versions of spark in your application vs your
> cluster.
>
> For the direct stream, it's not 60 partitions per executor, it's 300
> partitions, and executors work on them as they are scheduled.  Yes, if you
> have no messages you will get an empty partition.  It's up to you whether
> it's worthwhile to call coalesce or not.
>
> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Is this 3 is no of parallel consumer threads per receiver , means in
>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>> partitions.
>> 3 is just parallelism on same receiver and recommendation is to use 1 per
>> receiver since consuming from kafka is not cpu bound rather NIC(network
>> bound)  increasing consumer thread on one receiver won't make it parallel
>> in ideal sense ?
>>
>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>> partitions per executor (total 300 one to one mapping) and if some of kafka
>> partitions are empty say offset of last checkpoint to current is same for
>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>> should call coalesce on kafkaRDD ?
>>
>>
>> And is there any incompatibity issue when I include spark-streaming_2.10
>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>> cluster has spark version 1.2 ?
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1. Here you are basically creating 2 receivers and asking each of them
>>> to consume 3 kafka partitions each.
>>>
>>> - In 1.2 we have high level consumers so how can we restrict no of kafka
>>> partitions to consume from? Say I have 300 kafka partitions in kafka topic
>>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
>>> I will read from 6 out of 300 partitions only and for rest 294 partitions
>>> data is lost?
>>>
>>>
>>> 2.One more doubt in spark streaming how is it decided which part of main
>>> function of driver will run at each batch interval ? Since whole code is
>>> written in one function(main function in driver) so how it determined kafka
>>> streams receivers  not to be registered in each batch only processing to be
>>> done .
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> Let me take ashot at your questions. (I am sure people like Cody and TD
>>>> will correct if I am wrong)
>>>>
>>>> 0. This is exact copy from the similar question in mail thread from
>>>> Akhil D:
>>>> Since you set local[4] you will have 4 threads for your computation,
>>>> and since you are having 2 receivers, you are left with 2 threads to
>>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means
>>>> you are having 2 tasks in that stage (with id 0).
>>>>
>>>> 1. Here you are basically creating 2 receivers and asking each of them
>>>> to consume 3 kafka partitions each.
>>>> 2. How does that matter? It depends on how many receivers you have
>>>> created to consume that data and if you have repartitioned it. Remember,
>>>> spark is lazy and executors are relted to the context
>>>> 3. I think in java, factory method is fixed. You just pass around the
>>>> contextFactory object. (I love python :) see the signature isso much
>>>> cleaner :) )
>>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>>>> pointing too.
>>>>
>>>> Best
>>>> Ayan
>>>>
>>>>
>>>>
>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Few doubts :
>>>>>
>>>>> In 1.2 streaming when I use union of streams , my streaming
>>>>> application getting hanged sometimes and nothing gets printed on driver.
>>>>>
>>>>>
>>>>> [Stage 2:>
>>>>>
>>>>>                                       (0 + 2) / 2]
>>>>>  Whats is 0+2/2 here signifies.
>>>>>
>>>>>
>>>>>
>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>>>>> same as numstreams=2 ? in unioned stream ?
>>>>>
>>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>>>>> of app throughout its lifetime . Does executors gets allicated at start of
>>>>> each job on 1s batch interval? If yes, how does its fast to allocate
>>>>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>>>>> executors from yarnRM at start of each job so does it takes more time in
>>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>>>>> fixed processing executors also throughout the app?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>>>>> JavaStreamingContext jssc = new
>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>>>
>>>>> Map<String,String> kafkaParams = new HashMap<String, String>();
>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>>>> kafkaParams.put("group.id", "testgroup");
>>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>>>>>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
>>>>> topicsMap.put("testSparkPartitioned", 3);
>>>>> int numStreams = 2;
>>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new
>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>();
>>>>>   for(int i=0;i<numStreams;i++){
>>>>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>>>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>>>>> kafka.serializer.DefaultDecoder.class,
>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>>>> }
>>>>>  JavaPairDStream<byte[], byte[]> directKafkaStream =
>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>>>> kafkaStreams.size()));
>>>>>  JavaDStream<String> lines = directKafkaStream.map(new
>>>>> Function<Tuple2<byte[],byte[]>, String>() {
>>>>>
>>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception {
>>>>> ...processing
>>>>> ..return msg;
>>>>> }
>>>>> });
>>>>> lines.print();
>>>>> jssc.start();
>>>>> jssc.awaitTermination();
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> 3.For avoiding dataloss when we use checkpointing, and factory method
>>>>> to create sparkConytext, is method name fixed
>>>>> or we can use any name and how to set in app the method name to be
>>>>> used ?
>>>>>
>>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>>>>> zookeeper, is it because of zookeeper is not efficient for high writes and
>>>>> read is not strictly consistent? So
>>>>>
>>>>>  we use simple Kafka API that does not use Zookeeper and offsets
>>>>> tracked only by Spark Streaming within its checkpoints. This
>>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, 
>>>>> and
>>>>> so each record is received by Spark Streaming effectively exactly once
>>>>> despite failures.
>>>>>
>>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>>>>> checkoint location ? Means does hdfs be used for small data(just offset?)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> There is another option to try for Receiver Based Low Level Kafka
>>>>>> Consumer which is part of Spark-Packages (
>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) .
>>>>>> This can be used with WAL as well for end to end zero data loss.
>>>>>>
>>>>>> This is also Reliable Receiver and Commit offset to ZK.  Given the
>>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka API
>>>>>> for Receiver based approach may leads to issues related Consumer
>>>>>> Re-balancing  which is a major issue of Kafka High Level API.
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> In the receiver based approach, If the receiver crashes for any
>>>>>>> reason (receiver crashed or executor crashed) the receiver should get
>>>>>>> restarted on another executor and should start reading data from the 
>>>>>>> offset
>>>>>>> present in the zookeeper. There is some chance of data loss which can
>>>>>>> alleviated using Write Ahead Logs (see streaming programming guide for 
>>>>>>> more
>>>>>>> details, or see my talk [Slides PDF
>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>>>>>> , Video
>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>>>>>> ] from last Spark Summit 2015). But that approach can give
>>>>>>> duplicate records. The direct approach gives exactly-once guarantees, so
>>>>>>> you should try it out.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Read the spark streaming guide ad the kafka integration guide for a
>>>>>>>> better understanding of how the receiver based stream works.
>>>>>>>>
>>>>>>>> Capacity planning is specific to your environment and what the job
>>>>>>>> is actually doing, youll need to determine it empirically.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Friday, June 26, 2015, Shushant Arora <shushantaror...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In 1.2 how to handle offset management after stream application
>>>>>>>>> starts in each job . I should commit offset after job completion 
>>>>>>>>> manually?
>>>>>>>>>
>>>>>>>>> And what is recommended no of consumer threads. Say I have 300
>>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per 
>>>>>>>>> second.Each
>>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each 
>>>>>>>>> receiver
>>>>>>>>> is sufficient for spark streaming to consume ?
>>>>>>>>>
>>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper
>>>>>>>>>> to store offsets.  If you want finer-grained control over offsets, 
>>>>>>>>>> you can
>>>>>>>>>> update the values in zookeeper yourself before starting the job.
>>>>>>>>>>
>>>>>>>>>> createDirectStream in spark 1.3 is still marked as experimental,
>>>>>>>>>> and subject to change.  That being said, it works better for me in
>>>>>>>>>> production than the receiver based api.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am using spark streaming 1.2.
>>>>>>>>>>>
>>>>>>>>>>> If processing executors get crashed will receiver rest the
>>>>>>>>>>> offset back to last processed offset?
>>>>>>>>>>>
>>>>>>>>>>> If receiver itself got crashed is there a way to reset the
>>>>>>>>>>> offset without restarting streaming application other than smallest 
>>>>>>>>>>> or
>>>>>>>>>>> largest.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Is spark streaming 1.3  which uses low level consumer api,
>>>>>>>>>>> stabe? And which is recommended for handling data  loss 1.2 or 1.3 .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>


-- 
Chen Song

Reply via email to