You have access to the offset ranges for a given rdd in the stream by typecasting to HasOffsetRanges. You can then store the offsets wherever you need to.
On Tue, Jul 14, 2015 at 5:00 PM, Chen Song <chen.song...@gmail.com> wrote: > A follow up question. > > When using createDirectStream approach, the offsets are checkpointed to > HDFS and it is understandable by Spark Streaming job. Is there a way to > expose the offsets via a REST api to end users. Or alternatively, is there > a way to have offsets committed to Kafka Offset Manager so users can query > from a consumer programmatically? > > Essentially, all I need to do is monitor the progress of data consumption > of the Kafka topic. > > > On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You can't use different versions of spark in your application vs your >> cluster. >> >> For the direct stream, it's not 60 partitions per executor, it's 300 >> partitions, and executors work on them as they are scheduled. Yes, if you >> have no messages you will get an empty partition. It's up to you whether >> it's worthwhile to call coalesce or not. >> >> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Is this 3 is no of parallel consumer threads per receiver , means in >>> total we have 2*3=6 consumer in same consumer group consuming from all 300 >>> partitions. >>> 3 is just parallelism on same receiver and recommendation is to use 1 >>> per receiver since consuming from kafka is not cpu bound rather NIC(network >>> bound) increasing consumer thread on one receiver won't make it parallel >>> in ideal sense ? >>> >>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka >>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60 >>> partitions per executor (total 300 one to one mapping) and if some of kafka >>> partitions are empty say offset of last checkpoint to current is same for >>> partitons P123, still it will create empty partition in kafkaRDD ? So we >>> should call coalesce on kafkaRDD ? >>> >>> >>> And is there any incompatibity issue when I include spark-streaming_2.10 >>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my >>> cluster has spark version 1.2 ? >>> >>> >>> >>> >>> >>> >>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> 1. Here you are basically creating 2 receivers and asking each of them >>>> to consume 3 kafka partitions each. >>>> >>>> - In 1.2 we have high level consumers so how can we restrict no of >>>> kafka partitions to consume from? Say I have 300 kafka partitions in kafka >>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is >>>> it mean I will read from 6 out of 300 partitions only and for rest 294 >>>> partitions data is lost? >>>> >>>> >>>> 2.One more doubt in spark streaming how is it decided which part of >>>> main function of driver will run at each batch interval ? Since whole code >>>> is written in one function(main function in driver) so how it determined >>>> kafka streams receivers not to be registered in each batch only processing >>>> to be done . >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> wrote: >>>> >>>>> Hi >>>>> >>>>> Let me take ashot at your questions. (I am sure people like Cody and >>>>> TD will correct if I am wrong) >>>>> >>>>> 0. This is exact copy from the similar question in mail thread from >>>>> Akhil D: >>>>> Since you set local[4] you will have 4 threads for your computation, >>>>> and since you are having 2 receivers, you are left with 2 threads to >>>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 >>>>> means you are having 2 tasks in that stage (with id 0). >>>>> >>>>> 1. Here you are basically creating 2 receivers and asking each of them >>>>> to consume 3 kafka partitions each. >>>>> 2. How does that matter? It depends on how many receivers you have >>>>> created to consume that data and if you have repartitioned it. Remember, >>>>> spark is lazy and executors are relted to the context >>>>> 3. I think in java, factory method is fixed. You just pass around the >>>>> contextFactory object. (I love python :) see the signature isso much >>>>> cleaner :) ) >>>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check >>>>> pointing too. >>>>> >>>>> Best >>>>> Ayan >>>>> >>>>> >>>>> >>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> Few doubts : >>>>>> >>>>>> In 1.2 streaming when I use union of streams , my streaming >>>>>> application getting hanged sometimes and nothing gets printed on driver. >>>>>> >>>>>> >>>>>> [Stage 2:> >>>>>> >>>>>> (0 + 2) / 2] >>>>>> Whats is 0+2/2 here signifies. >>>>>> >>>>>> >>>>>> >>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be >>>>>> same as numstreams=2 ? in unioned stream ? >>>>>> >>>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2 >>>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at >>>>>> start >>>>>> of app throughout its lifetime . Does executors gets allicated at start >>>>>> of >>>>>> each job on 1s batch interval? If yes, how does its fast to allocate >>>>>> resources. I mean if i increase num-executors to 50 , it will negotiate >>>>>> 50 >>>>>> executors from yarnRM at start of each job so does it takes more time in >>>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i >>>>>> fixed processing executors also throughout the app? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> SparkConf conf = new >>>>>> SparkConf().setAppName("SampleSparkStreamingApp"); >>>>>> JavaStreamingContext jssc = new >>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000)); >>>>>> >>>>>> Map<String,String> kafkaParams = new HashMap<String, String>(); >>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181"); >>>>>> kafkaParams.put("group.id", "testgroup"); >>>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000"); >>>>>> Map<String,Integer> topicsMap = new HashMap<String,Integer>(); >>>>>> topicsMap.put("testSparkPartitioned", 3); >>>>>> int numStreams = 2; >>>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new >>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>(); >>>>>> for(int i=0;i<numStreams;i++){ >>>>>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, >>>>>> byte[].class,kafka.serializer.DefaultDecoder.class , >>>>>> kafka.serializer.DefaultDecoder.class, >>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); >>>>>> } >>>>>> JavaPairDStream<byte[], byte[]> directKafkaStream = >>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1, >>>>>> kafkaStreams.size())); >>>>>> JavaDStream<String> lines = directKafkaStream.map(new >>>>>> Function<Tuple2<byte[],byte[]>, String>() { >>>>>> >>>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception { >>>>>> ...processing >>>>>> ..return msg; >>>>>> } >>>>>> }); >>>>>> lines.print(); >>>>>> jssc.start(); >>>>>> jssc.awaitTermination(); >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> 3.For avoiding dataloss when we use checkpointing, and factory method >>>>>> to create sparkConytext, is method name fixed >>>>>> or we can use any name and how to set in app the method name to be >>>>>> used ? >>>>>> >>>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in >>>>>> zookeeper, is it because of zookeeper is not efficient for high writes >>>>>> and >>>>>> read is not strictly consistent? So >>>>>> >>>>>> we use simple Kafka API that does not use Zookeeper and offsets >>>>>> tracked only by Spark Streaming within its checkpoints. This >>>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, >>>>>> and >>>>>> so each record is received by Spark Streaming effectively exactly once >>>>>> despite failures. >>>>>> >>>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit >>>>>> checkoint location ? Means does hdfs be used for small data(just offset?) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya < >>>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> There is another option to try for Receiver Based Low Level Kafka >>>>>>> Consumer which is part of Spark-Packages ( >>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . >>>>>>> This can be used with WAL as well for end to end zero data loss. >>>>>>> >>>>>>> This is also Reliable Receiver and Commit offset to ZK. Given the >>>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka >>>>>>> API >>>>>>> for Receiver based approach may leads to issues related Consumer >>>>>>> Re-balancing which is a major issue of Kafka High Level API. >>>>>>> >>>>>>> Regards, >>>>>>> Dibyendu >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> In the receiver based approach, If the receiver crashes for any >>>>>>>> reason (receiver crashed or executor crashed) the receiver should get >>>>>>>> restarted on another executor and should start reading data from the >>>>>>>> offset >>>>>>>> present in the zookeeper. There is some chance of data loss which can >>>>>>>> alleviated using Write Ahead Logs (see streaming programming guide for >>>>>>>> more >>>>>>>> details, or see my talk [Slides PDF >>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx> >>>>>>>> , Video >>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4> >>>>>>>> ] from last Spark Summit 2015). But that approach can give >>>>>>>> duplicate records. The direct approach gives exactly-once guarantees, >>>>>>>> so >>>>>>>> you should try it out. >>>>>>>> >>>>>>>> TD >>>>>>>> >>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Read the spark streaming guide ad the kafka integration guide for >>>>>>>>> a better understanding of how the receiver based stream works. >>>>>>>>> >>>>>>>>> Capacity planning is specific to your environment and what the job >>>>>>>>> is actually doing, youll need to determine it empirically. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Friday, June 26, 2015, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> In 1.2 how to handle offset management after stream application >>>>>>>>>> starts in each job . I should commit offset after job completion >>>>>>>>>> manually? >>>>>>>>>> >>>>>>>>>> And what is recommended no of consumer threads. Say I have 300 >>>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per >>>>>>>>>> second.Each >>>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each >>>>>>>>>> receiver >>>>>>>>>> is sufficient for spark streaming to consume ? >>>>>>>>>> >>>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger < >>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>> >>>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses >>>>>>>>>>> zookeeper to store offsets. If you want finer-grained control over >>>>>>>>>>> offsets, you can update the values in zookeeper yourself before >>>>>>>>>>> starting >>>>>>>>>>> the job. >>>>>>>>>>> >>>>>>>>>>> createDirectStream in spark 1.3 is still marked as experimental, >>>>>>>>>>> and subject to change. That being said, it works better for me in >>>>>>>>>>> production than the receiver based api. >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora < >>>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I am using spark streaming 1.2. >>>>>>>>>>>> >>>>>>>>>>>> If processing executors get crashed will receiver rest the >>>>>>>>>>>> offset back to last processed offset? >>>>>>>>>>>> >>>>>>>>>>>> If receiver itself got crashed is there a way to reset the >>>>>>>>>>>> offset without restarting streaming application other than >>>>>>>>>>>> smallest or >>>>>>>>>>>> largest. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Is spark streaming 1.3 which uses low level consumer api, >>>>>>>>>>>> stabe? And which is recommended for handling data loss 1.2 or 1.3 >>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >>> >> > > > -- > Chen Song > >