3. You need to use your own method, because you need to set up your job.
Read the checkpoint documentation.

4.  Yes, if you want to checkpoint, you need to specify a url to store the
checkpoint at (s3 or hdfs).  Yes, for the direct stream checkpoint it's
just offsets, not all the messages.

On Sun, Jun 28, 2015 at 1:02 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming application
> getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>                                   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
> as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> executors from yarnRM at start of each job so does it takes more time in
> allocating executors than batch interval(here 1s , say if 500ms).? Can i
> fixed processing executors also throughout the app?
>
>
>
>
> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
> JavaStreamingContext jssc = new
> JavaStreamingContext(conf,Durations.milliseconds(1000));
>
> Map<String,String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("zookeeper.connect","ipadd:2181");
> kafkaParams.put("group.id", "testgroup");
> kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
> topicsMap.put("testSparkPartitioned", 3);
> int numStreams = 2;
> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new
> ArrayList<JavaPairDStream<byte[], byte[]>>();
>   for(int i=0;i<numStreams;i++){
>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
> }
>  JavaPairDStream<byte[], byte[]> directKafkaStream =
> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
> kafkaStreams.size()));
>  JavaDStream<String> lines = directKafkaStream.map(new
> Function<Tuple2<byte[],byte[]>, String>() {
>
> public String call(Tuple2<byte[], byte[]> arg0) throws Exception {
> ...processing
> ..return msg;
> }
> });
> lines.print();
> jssc.start();
> jssc.awaitTermination();
>
>
>
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------
> 3.For avoiding dataloss when we use checkpointing, and factory method to
> create sparkConytext, is method name fixed
> or we can use any name and how to set in app the method name to be used ?
>
> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
> zookeeper, is it because of zookeeper is not efficient for high writes and
> read is not strictly consistent? So
>
>  we use simple Kafka API that does not use Zookeeper and offsets tracked
> only by Spark Streaming within its checkpoints. This eliminates
> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
> record is received by Spark Streaming effectively exactly once despite
> failures.
>
> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
> checkoint location ? Means does hdfs be used for small data(just offset?)
>
>
>
>
>
>
>
>
>
>
> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> There is another option to try for Receiver Based Low Level Kafka
>> Consumer which is part of Spark-Packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>> can be used with WAL as well for end to end zero data loss.
>>
>> This is also Reliable Receiver and Commit offset to ZK.  Given the number
>> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
>> Receiver based approach may leads to issues related Consumer Re-balancing
>>  which is a major issue of Kafka High Level API.
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> In the receiver based approach, If the receiver crashes for any reason
>>> (receiver crashed or executor crashed) the receiver should get restarted on
>>> another executor and should start reading data from the offset present in
>>> the zookeeper. There is some chance of data loss which can alleviated using
>>> Write Ahead Logs (see streaming programming guide for more details, or see
>>> my talk [Slides PDF
>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>> , Video
>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>> ] from last Spark Summit 2015). But that approach can give duplicate
>>> records. The direct approach gives exactly-once guarantees, so you should
>>> try it out.
>>>
>>> TD
>>>
>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Read the spark streaming guide ad the kafka integration guide for a
>>>> better understanding of how the receiver based stream works.
>>>>
>>>> Capacity planning is specific to your environment and what the job is
>>>> actually doing, youll need to determine it empirically.
>>>>
>>>>
>>>> On Friday, June 26, 2015, Shushant Arora <shushantaror...@gmail.com>
>>>> wrote:
>>>>
>>>>> In 1.2 how to handle offset management after stream application starts
>>>>> in each job . I should commit offset after job completion manually?
>>>>>
>>>>> And what is recommended no of consumer threads. Say I have 300
>>>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
>>>>> is sufficient for spark streaming to consume ?
>>>>>
>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>>>>>> store offsets.  If you want finer-grained control over offsets, you can
>>>>>> update the values in zookeeper yourself before starting the job.
>>>>>>
>>>>>> createDirectStream in spark 1.3 is still marked as experimental, and
>>>>>> subject to change.  That being said, it works better for me in production
>>>>>> than the receiver based api.
>>>>>>
>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> I am using spark streaming 1.2.
>>>>>>>
>>>>>>> If processing executors get crashed will receiver rest the offset
>>>>>>> back to last processed offset?
>>>>>>>
>>>>>>> If receiver itself got crashed is there a way to reset the offset
>>>>>>> without restarting streaming application other than smallest or largest.
>>>>>>>
>>>>>>>
>>>>>>> Is spark streaming 1.3  which uses low level consumer api, stabe?
>>>>>>> And which is recommended for handling data  loss 1.2 or 1.3 .
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Reply via email to