3. You need to use your own method, because you need to set up your job. Read the checkpoint documentation.
4. Yes, if you want to checkpoint, you need to specify a url to store the checkpoint at (s3 or hdfs). Yes, for the direct stream checkpoint it's just offsets, not all the messages. On Sun, Jun 28, 2015 at 1:02 PM, Shushant Arora <shushantaror...@gmail.com> wrote: > Few doubts : > > In 1.2 streaming when I use union of streams , my streaming application > getting hanged sometimes and nothing gets printed on driver. > > > [Stage 2:> > > (0 + 2) / 2] > Whats is 0+2/2 here signifies. > > > > 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same > as numstreams=2 ? in unioned stream ? > > 2. I launched app on yarnRM with num-executors as 5 . It created 2 > receivers and 5 execuots . As in stream receivers nodes get fixed at start > of app throughout its lifetime . Does executors gets allicated at start of > each job on 1s batch interval? If yes, how does its fast to allocate > resources. I mean if i increase num-executors to 50 , it will negotiate 50 > executors from yarnRM at start of each job so does it takes more time in > allocating executors than batch interval(here 1s , say if 500ms).? Can i > fixed processing executors also throughout the app? > > > > > SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp"); > JavaStreamingContext jssc = new > JavaStreamingContext(conf,Durations.milliseconds(1000)); > > Map<String,String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put("zookeeper.connect","ipadd:2181"); > kafkaParams.put("group.id", "testgroup"); > kafkaParams.put("zookeeper.session.timeout.ms", "10000"); > Map<String,Integer> topicsMap = new HashMap<String,Integer>(); > topicsMap.put("testSparkPartitioned", 3); > int numStreams = 2; > List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new > ArrayList<JavaPairDStream<byte[], byte[]>>(); > for(int i=0;i<numStreams;i++){ > kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, > byte[].class,kafka.serializer.DefaultDecoder.class , > kafka.serializer.DefaultDecoder.class, > kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); > } > JavaPairDStream<byte[], byte[]> directKafkaStream = > jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1, > kafkaStreams.size())); > JavaDStream<String> lines = directKafkaStream.map(new > Function<Tuple2<byte[],byte[]>, String>() { > > public String call(Tuple2<byte[], byte[]> arg0) throws Exception { > ...processing > ..return msg; > } > }); > lines.print(); > jssc.start(); > jssc.awaitTermination(); > > > > > ------------------------------------------------------------------------------------------------------------------------------------------------------- > 3.For avoiding dataloss when we use checkpointing, and factory method to > create sparkConytext, is method name fixed > or we can use any name and how to set in app the method name to be used ? > > 4.In 1.3 non receiver based streaming, kafka offset is not stored in > zookeeper, is it because of zookeeper is not efficient for high writes and > read is not strictly consistent? So > > we use simple Kafka API that does not use Zookeeper and offsets tracked > only by Spark Streaming within its checkpoints. This eliminates > inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each > record is received by Spark Streaming effectively exactly once despite > failures. > > So we have to call context.checkpoint(hdfsdir)? Or is it implicit > checkoint location ? Means does hdfs be used for small data(just offset?) > > > > > > > > > > > On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Hi, >> >> There is another option to try for Receiver Based Low Level Kafka >> Consumer which is part of Spark-Packages ( >> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This >> can be used with WAL as well for end to end zero data loss. >> >> This is also Reliable Receiver and Commit offset to ZK. Given the number >> of Kafka Partitions you have ( > 100) , using High Level Kafka API for >> Receiver based approach may leads to issues related Consumer Re-balancing >> which is a major issue of Kafka High Level API. >> >> Regards, >> Dibyendu >> >> >> >> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> In the receiver based approach, If the receiver crashes for any reason >>> (receiver crashed or executor crashed) the receiver should get restarted on >>> another executor and should start reading data from the offset present in >>> the zookeeper. There is some chance of data loss which can alleviated using >>> Write Ahead Logs (see streaming programming guide for more details, or see >>> my talk [Slides PDF >>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx> >>> , Video >>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4> >>> ] from last Spark Summit 2015). But that approach can give duplicate >>> records. The direct approach gives exactly-once guarantees, so you should >>> try it out. >>> >>> TD >>> >>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Read the spark streaming guide ad the kafka integration guide for a >>>> better understanding of how the receiver based stream works. >>>> >>>> Capacity planning is specific to your environment and what the job is >>>> actually doing, youll need to determine it empirically. >>>> >>>> >>>> On Friday, June 26, 2015, Shushant Arora <shushantaror...@gmail.com> >>>> wrote: >>>> >>>>> In 1.2 how to handle offset management after stream application starts >>>>> in each job . I should commit offset after job completion manually? >>>>> >>>>> And what is recommended no of consumer threads. Say I have 300 >>>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each >>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver >>>>> is sufficient for spark streaming to consume ? >>>>> >>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to >>>>>> store offsets. If you want finer-grained control over offsets, you can >>>>>> update the values in zookeeper yourself before starting the job. >>>>>> >>>>>> createDirectStream in spark 1.3 is still marked as experimental, and >>>>>> subject to change. That being said, it works better for me in production >>>>>> than the receiver based api. >>>>>> >>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> I am using spark streaming 1.2. >>>>>>> >>>>>>> If processing executors get crashed will receiver rest the offset >>>>>>> back to last processed offset? >>>>>>> >>>>>>> If receiver itself got crashed is there a way to reset the offset >>>>>>> without restarting streaming application other than smallest or largest. >>>>>>> >>>>>>> >>>>>>> Is spark streaming 1.3 which uses low level consumer api, stabe? >>>>>>> And which is recommended for handling data loss 1.2 or 1.3 . >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >