A follow up question. When using createDirectStream approach, the offsets are checkpointed to HDFS and it is understandable by Spark Streaming job. Is there a way to expose the offsets via a REST api to end users. Or alternatively, is there a way to have offsets committed to Kafka Offset Manager so users can query from a consumer programmatically?
Essentially, all I need to do is monitor the progress of data consumption of the Kafka topic. On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org> wrote: > You can't use different versions of spark in your application vs your > cluster. > > For the direct stream, it's not 60 partitions per executor, it's 300 > partitions, and executors work on them as they are scheduled. Yes, if you > have no messages you will get an empty partition. It's up to you whether > it's worthwhile to call coalesce or not. > > On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> Is this 3 is no of parallel consumer threads per receiver , means in >> total we have 2*3=6 consumer in same consumer group consuming from all 300 >> partitions. >> 3 is just parallelism on same receiver and recommendation is to use 1 per >> receiver since consuming from kafka is not cpu bound rather NIC(network >> bound) increasing consumer thread on one receiver won't make it parallel >> in ideal sense ? >> >> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka >> topic has 300 partions , does kafkaRDD created on 5 executors will have 60 >> partitions per executor (total 300 one to one mapping) and if some of kafka >> partitions are empty say offset of last checkpoint to current is same for >> partitons P123, still it will create empty partition in kafkaRDD ? So we >> should call coalesce on kafkaRDD ? >> >> >> And is there any incompatibity issue when I include spark-streaming_2.10 >> (version 1.3) and spark-core_2.10(version 1.3) in my application but my >> cluster has spark version 1.2 ? >> >> >> >> >> >> >> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> 1. Here you are basically creating 2 receivers and asking each of them >>> to consume 3 kafka partitions each. >>> >>> - In 1.2 we have high level consumers so how can we restrict no of kafka >>> partitions to consume from? Say I have 300 kafka partitions in kafka topic >>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean >>> I will read from 6 out of 300 partitions only and for rest 294 partitions >>> data is lost? >>> >>> >>> 2.One more doubt in spark streaming how is it decided which part of main >>> function of driver will run at each batch interval ? Since whole code is >>> written in one function(main function in driver) so how it determined kafka >>> streams receivers not to be registered in each batch only processing to be >>> done . >>> >>> >>> >>> >>> >>> >>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> Let me take ashot at your questions. (I am sure people like Cody and TD >>>> will correct if I am wrong) >>>> >>>> 0. This is exact copy from the similar question in mail thread from >>>> Akhil D: >>>> Since you set local[4] you will have 4 threads for your computation, >>>> and since you are having 2 receivers, you are left with 2 threads to >>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means >>>> you are having 2 tasks in that stage (with id 0). >>>> >>>> 1. Here you are basically creating 2 receivers and asking each of them >>>> to consume 3 kafka partitions each. >>>> 2. How does that matter? It depends on how many receivers you have >>>> created to consume that data and if you have repartitioned it. Remember, >>>> spark is lazy and executors are relted to the context >>>> 3. I think in java, factory method is fixed. You just pass around the >>>> contextFactory object. (I love python :) see the signature isso much >>>> cleaner :) ) >>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check >>>> pointing too. >>>> >>>> Best >>>> Ayan >>>> >>>> >>>> >>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Few doubts : >>>>> >>>>> In 1.2 streaming when I use union of streams , my streaming >>>>> application getting hanged sometimes and nothing gets printed on driver. >>>>> >>>>> >>>>> [Stage 2:> >>>>> >>>>> (0 + 2) / 2] >>>>> Whats is 0+2/2 here signifies. >>>>> >>>>> >>>>> >>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be >>>>> same as numstreams=2 ? in unioned stream ? >>>>> >>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2 >>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at start >>>>> of app throughout its lifetime . Does executors gets allicated at start of >>>>> each job on 1s batch interval? If yes, how does its fast to allocate >>>>> resources. I mean if i increase num-executors to 50 , it will negotiate 50 >>>>> executors from yarnRM at start of each job so does it takes more time in >>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i >>>>> fixed processing executors also throughout the app? >>>>> >>>>> >>>>> >>>>> >>>>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp"); >>>>> JavaStreamingContext jssc = new >>>>> JavaStreamingContext(conf,Durations.milliseconds(1000)); >>>>> >>>>> Map<String,String> kafkaParams = new HashMap<String, String>(); >>>>> kafkaParams.put("zookeeper.connect","ipadd:2181"); >>>>> kafkaParams.put("group.id", "testgroup"); >>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000"); >>>>> Map<String,Integer> topicsMap = new HashMap<String,Integer>(); >>>>> topicsMap.put("testSparkPartitioned", 3); >>>>> int numStreams = 2; >>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new >>>>> ArrayList<JavaPairDStream<byte[], byte[]>>(); >>>>> for(int i=0;i<numStreams;i++){ >>>>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, >>>>> byte[].class,kafka.serializer.DefaultDecoder.class , >>>>> kafka.serializer.DefaultDecoder.class, >>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); >>>>> } >>>>> JavaPairDStream<byte[], byte[]> directKafkaStream = >>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1, >>>>> kafkaStreams.size())); >>>>> JavaDStream<String> lines = directKafkaStream.map(new >>>>> Function<Tuple2<byte[],byte[]>, String>() { >>>>> >>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception { >>>>> ...processing >>>>> ..return msg; >>>>> } >>>>> }); >>>>> lines.print(); >>>>> jssc.start(); >>>>> jssc.awaitTermination(); >>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>> 3.For avoiding dataloss when we use checkpointing, and factory method >>>>> to create sparkConytext, is method name fixed >>>>> or we can use any name and how to set in app the method name to be >>>>> used ? >>>>> >>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in >>>>> zookeeper, is it because of zookeeper is not efficient for high writes and >>>>> read is not strictly consistent? So >>>>> >>>>> we use simple Kafka API that does not use Zookeeper and offsets >>>>> tracked only by Spark Streaming within its checkpoints. This >>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, >>>>> and >>>>> so each record is received by Spark Streaming effectively exactly once >>>>> despite failures. >>>>> >>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit >>>>> checkoint location ? Means does hdfs be used for small data(just offset?) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya < >>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> There is another option to try for Receiver Based Low Level Kafka >>>>>> Consumer which is part of Spark-Packages ( >>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . >>>>>> This can be used with WAL as well for end to end zero data loss. >>>>>> >>>>>> This is also Reliable Receiver and Commit offset to ZK. Given the >>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka API >>>>>> for Receiver based approach may leads to issues related Consumer >>>>>> Re-balancing which is a major issue of Kafka High Level API. >>>>>> >>>>>> Regards, >>>>>> Dibyendu >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> In the receiver based approach, If the receiver crashes for any >>>>>>> reason (receiver crashed or executor crashed) the receiver should get >>>>>>> restarted on another executor and should start reading data from the >>>>>>> offset >>>>>>> present in the zookeeper. There is some chance of data loss which can >>>>>>> alleviated using Write Ahead Logs (see streaming programming guide for >>>>>>> more >>>>>>> details, or see my talk [Slides PDF >>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx> >>>>>>> , Video >>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4> >>>>>>> ] from last Spark Summit 2015). But that approach can give >>>>>>> duplicate records. The direct approach gives exactly-once guarantees, so >>>>>>> you should try it out. >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Read the spark streaming guide ad the kafka integration guide for a >>>>>>>> better understanding of how the receiver based stream works. >>>>>>>> >>>>>>>> Capacity planning is specific to your environment and what the job >>>>>>>> is actually doing, youll need to determine it empirically. >>>>>>>> >>>>>>>> >>>>>>>> On Friday, June 26, 2015, Shushant Arora <shushantaror...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> In 1.2 how to handle offset management after stream application >>>>>>>>> starts in each job . I should commit offset after job completion >>>>>>>>> manually? >>>>>>>>> >>>>>>>>> And what is recommended no of consumer threads. Say I have 300 >>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per >>>>>>>>> second.Each >>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each >>>>>>>>> receiver >>>>>>>>> is sufficient for spark streaming to consume ? >>>>>>>>> >>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper >>>>>>>>>> to store offsets. If you want finer-grained control over offsets, >>>>>>>>>> you can >>>>>>>>>> update the values in zookeeper yourself before starting the job. >>>>>>>>>> >>>>>>>>>> createDirectStream in spark 1.3 is still marked as experimental, >>>>>>>>>> and subject to change. That being said, it works better for me in >>>>>>>>>> production than the receiver based api. >>>>>>>>>> >>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora < >>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I am using spark streaming 1.2. >>>>>>>>>>> >>>>>>>>>>> If processing executors get crashed will receiver rest the >>>>>>>>>>> offset back to last processed offset? >>>>>>>>>>> >>>>>>>>>>> If receiver itself got crashed is there a way to reset the >>>>>>>>>>> offset without restarting streaming application other than smallest >>>>>>>>>>> or >>>>>>>>>>> largest. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Is spark streaming 1.3 which uses low level consumer api, >>>>>>>>>>> stabe? And which is recommended for handling data loss 1.2 or 1.3 . >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> > -- Chen Song