Re: spark streaming 1.3 doubts(force it to not consume anything)
;>> scala.Option> >>>>>>>> >>>>>>>> [ERROR] required: scala.Option> >>>>>>>> >>>>>>>> >>>>>>>> class : >>>>>>>> >>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>> DirectKafkaInputDStream>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>> >>>>>>>> @Override >>>>>>>> public Option>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>> Time validTime) { >>>>>>>> >>>>>>>> int processed=processedCounter.value(); >>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>> if((processed==failed)){ >>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>> return Option.empty(); >>>>>>>> }else{ >>>>>>>> System.out.println("starting the stream "); >>>>>>>> >>>>>>>> return super.compute(validTime); >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> What should be the return type of compute method ? super class is >>>>>>>> returning Option>>>>>>> DefaultDecoder, >>>>>>>> byte[][]>> but its expecting >>>>>>>> scala.Option> from derived class >>>>>>>> . Is >>>>>>>> there something wring with code? >>>>>>>> >>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger >>>>>>> > wrote: >>>>>>>> >>>>>>>>> Look at the definitions of the java-specific >>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>>>> JavaStreamingContext) >>>>>>>>> >>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>>>> >>>>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>>>> DirectKafkaInputDStream>>>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>>>> >>>>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>>>> StreamingContext ssc_, >>>>>>>>>> Map kafkaParams, >>>>>>>>>> Map fromOffsets, >>>>>>>>>> Function1, byte[][]> >>>>>>>>>> messageHandler, >>>>>>>>>> ClassTag evidence$1, ClassTag evidence$2, >>>>>>>>>> ClassTag evidence$3, >>>>>>>>>> ClassTag evidence$4, ClassTag >>>>>>>>>> evidence$5) { >>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>>>> evidence$2, >>>>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>>>> } >>>>>>>>>> @Override >>>>>>>>>> public Option>>>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>>>> Time validTime) { >>>>>>>>>> int processe=processedCounter.value(); >>>>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>>>> if((processed==failed)){ >>>>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>>>> return Option.empty(); >>>>>>>>>> }else{ >>>>>>>>>> System.out.println("starting the stream "); >>>>>>>>>> >>>>>>>>>> return super.compute(validTime); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> To create this stream >>>>>>>>>> I am using >>>>>>>>>> scala.collection.immutable.Map scalakafkaParams = >>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>>>> String>>conforms()); >>>>>>>>>> scala.collection.immutable.Map >>>>>>>>>> scalaktopicOffsetMap= >>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>>>> Long>>conforms()); >>>>>>>>>> >>>>>>>>>> scala.Function1, byte[][]> >>>>>>>>>> handler = new Function, >>>>>>>>>> byte[][]>() { >>>>>>>>>> ..}); >>>>>>>>>> JavaDStream directKafkaStream = new >>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>>>> ,scalaktopicOffsetMap, >>>>>>>>>> handler,byte[].class,byte[].class, >>>>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> How to pass classTag to constructor in >>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>>>> Function1 ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>> >>>>>>>>>>> I'm not aware of an existing api per se, but you could create >>>>>>>>>>> your own subclass of the DStream that returns None for compute() >>>>>>>>>>> under >>>>>>>>>>> certain conditions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Cody >>>>>>>>>>>> >>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not >>>>>>>>>>>> consuming any message in next few runs? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> -- Forwarded message -- >>>>>>>>>>>> From: Shushant Arora >>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>>>> anything) >>>>>>>>>>>> To: user >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>>>> run time . Its always fixed and it always creates jobs at >>>>>>>>>>>> specified batch >>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>>>> >>>>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>>>> the batch >>>>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>>>> messages >>>>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
gt;>>>>>> public Option>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>> Time validTime) { >>>>>>> >>>>>>> int processed=processedCounter.value(); >>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>> if((processed==failed)){ >>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>> return Option.empty(); >>>>>>> }else{ >>>>>>> System.out.println("starting the stream "); >>>>>>> >>>>>>> return super.compute(validTime); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> What should be the return type of compute method ? super class is >>>>>>> returning Option>>>>>> DefaultDecoder, >>>>>>> byte[][]>> but its expecting >>>>>>> scala.Option> from derived class . >>>>>>> Is >>>>>>> there something wring with code? >>>>>>> >>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger >>>>>>> wrote: >>>>>>> >>>>>>>> Look at the definitions of the java-specific >>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>>> JavaStreamingContext) >>>>>>>> >>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>> >>>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>>> >>>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>>> >>>>>>>>> >>>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>>> DirectKafkaInputDStream>>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>>> >>>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>>> StreamingContext ssc_, >>>>>>>>> Map kafkaParams, >>>>>>>>> Map fromOffsets, >>>>>>>>> Function1, byte[][]> >>>>>>>>> messageHandler, >>>>>>>>> ClassTag evidence$1, ClassTag evidence$2, >>>>>>>>> ClassTag evidence$3, >>>>>>>>> ClassTag evidence$4, ClassTag >>>>>>>>> evidence$5) { >>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>>> evidence$2, >>>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>>> } >>>>>>>>> @Override >>>>>>>>> public Option>>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>>> Time validTime) { >>>>>>>>> int processe=processedCounter.value(); >>>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>>> if((processed==failed)){ >>>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>>> return Option.empty(); >>>>>>>>> }else{ >>>>>>>>> System.out.println("starting the stream "); >>>>>>>>> >>>>>>>>> return super.compute(validTime); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> To create this stream >>>>>>>>> I am using >>>>>>>>> scala.collection.immutable.Map scalakafkaParams = >>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>>> String>>conforms()); >>>>>>>>> scala.collection.immutable.Map >>>>>>>>> scalaktopicOffsetMap= >>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>>> Long>>conforms()); >>>>>>>>> >>>>>>>>> scala.Function1, byte[][]> >>>>>>>>> handler = new Function, >>>>>>>>> byte[][]>() { >>>>>>>>> ..}); >>>>>>>>> JavaDStream directKafkaStream = new >>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>>> ,scalaktopicOffsetMap, >>>>>>>>> handler,byte[].class,byte[].class, >>>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> How to pass classTag to constructor in >>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>>> Function1 ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> I'm not aware of an existing api per se, but you could create >>>>>>>>>> your own subclass of the DStream that returns None for compute() >>>>>>>>>> under >>>>>>>>>> certain conditions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Cody >>>>>>>>>>> >>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>>>> any message in next few runs? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> -- Forwarded message -- >>>>>>>>>>> From: Shushant Arora >>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>>> anything) >>>>>>>>>>> To: user >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified >>>>>>>>>>> batch >>>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>>> >>>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>>> the batch >>>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>>> messages >>>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
;> return super.compute(validTime); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> What should be the return type of compute method ? super class is >>>>>> returning Option>>>>> byte[][]>> but its expecting >>>>>> scala.Option> from derived class . >>>>>> Is >>>>>> there something wring with code? >>>>>> >>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger >>>>>> wrote: >>>>>> >>>>>>> Look at the definitions of the java-specific >>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>> JavaStreamingContext) >>>>>>> >>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>> >>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>> >>>>>>>> >>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>> DirectKafkaInputDStream>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>> >>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>> StreamingContext ssc_, >>>>>>>> Map kafkaParams, >>>>>>>> Map fromOffsets, >>>>>>>> Function1, byte[][]> >>>>>>>> messageHandler, >>>>>>>> ClassTag evidence$1, ClassTag evidence$2, >>>>>>>> ClassTag evidence$3, >>>>>>>> ClassTag evidence$4, ClassTag evidence$5) >>>>>>>> { >>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>> evidence$2, >>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>> } >>>>>>>> @Override >>>>>>>> public Option>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>> Time validTime) { >>>>>>>> int processe=processedCounter.value(); >>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>> if((processed==failed)){ >>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>> return Option.empty(); >>>>>>>> }else{ >>>>>>>> System.out.println("starting the stream "); >>>>>>>> >>>>>>>> return super.compute(validTime); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> To create this stream >>>>>>>> I am using >>>>>>>> scala.collection.immutable.Map scalakafkaParams = >>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>> String>>conforms()); >>>>>>>> scala.collection.immutable.Map >>>>>>>> scalaktopicOffsetMap= >>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>> Long>>conforms()); >>>>>>>> >>>>>>>> scala.Function1, byte[][]> >>>>>>>> handler = new Function, byte[][]>() >>>>>>>> { >>>>>>>> ..}); >>>>>>>> JavaDStream directKafkaStream = new >>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>> ,scalaktopicOffsetMap, >>>>>>>> handler,byte[].class,byte[].class, >>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> How to pass classTag to constructor in >>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>> Function1 ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>> c...@koeninger.org> wrote: >>>>>>>> >>>>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>>>> certain >>>>>>>>> conditions. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Cody >>>>>>>>>> >>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>>> any message in next few runs? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> -- Forwarded message -- >>>>>>>>>> From: Shushant Arora >>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>> anything) >>>>>>>>>> To: user >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>> run time . Its always fixed and it always creates jobs at specified >>>>>>>>>> batch >>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>> >>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>> the batch >>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>> messages >>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
3 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> How to create classtag in java ?Also Constructor >>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>> kafkautils.createDirectStream allows function. >>>>>>> >>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>> >>>>>>> >>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>> DirectKafkaInputDStream>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>> >>>>>>> public CustomDirectKafkaInputDstream( >>>>>>> StreamingContext ssc_, >>>>>>> Map kafkaParams, >>>>>>> Map fromOffsets, >>>>>>> Function1, byte[][]> >>>>>>> messageHandler, >>>>>>> ClassTag evidence$1, ClassTag evidence$2, >>>>>>> ClassTag evidence$3, >>>>>>> ClassTag evidence$4, ClassTag evidence$5) { >>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>> evidence$2, >>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>> } >>>>>>> @Override >>>>>>> public Option>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>> Time validTime) { >>>>>>> int processe=processedCounter.value(); >>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>> if((processed==failed)){ >>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>> return Option.empty(); >>>>>>> }else{ >>>>>>> System.out.println("starting the stream "); >>>>>>> >>>>>>> return super.compute(validTime); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> To create this stream >>>>>>> I am using >>>>>>> scala.collection.immutable.Map scalakafkaParams = >>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>> String>>conforms()); >>>>>>> scala.collection.immutable.Map >>>>>>> scalaktopicOffsetMap= >>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>> Long>>conforms()); >>>>>>> >>>>>>> scala.Function1, byte[][]> >>>>>>> handler = new Function, byte[][]>() { >>>>>>> ..}); >>>>>>> JavaDStream directKafkaStream = new >>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>> ,scalaktopicOffsetMap, >>>>>>> handler,byte[].class,byte[].class, >>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>> >>>>>>> >>>>>>> >>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream >>>>>>> ? And how to use Function instead of Function1 ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>>>>> > wrote: >>>>>>> >>>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>>> certain >>>>>>>> conditions. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Cody >>>>>>>>> >>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>> any message in next few runs? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> -- Forwarded message -- >>>>>>>>> From: Shushant Arora >>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>> anything) >>>>>>>>> To: user >>>>>>>>> >>>>>>>>> >>>>>>>>> I Can't make my stream application batch interval to change at run >>>>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>> >>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>> external server and if external server is down I want to increase the >>>>>>>>> batch >>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>> messages >>>>>>>>> in say next 5 successive runs ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
omDirectKafkaInputDstream( >>>>>> StreamingContext ssc_, >>>>>> Map kafkaParams, >>>>>> Map fromOffsets, >>>>>> Function1, byte[][]> >>>>>> messageHandler, >>>>>> ClassTag evidence$1, ClassTag evidence$2, >>>>>> ClassTag evidence$3, >>>>>> ClassTag evidence$4, ClassTag evidence$5) { >>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>> evidence$2, >>>>>> evidence$3, evidence$4, evidence$5); >>>>>> } >>>>>> @Override >>>>>> public Option>>>>> DefaultDecoder, byte[][]>> compute( >>>>>> Time validTime) { >>>>>> int processe=processedCounter.value(); >>>>>> int failed = failedProcessingsCounter.value(); >>>>>> if((processed==failed)){ >>>>>> System.out.println("backing off since its 100 % failure"); >>>>>> return Option.empty(); >>>>>> }else{ >>>>>> System.out.println("starting the stream "); >>>>>> >>>>>> return super.compute(validTime); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> To create this stream >>>>>> I am using >>>>>> scala.collection.immutable.Map scalakafkaParams = >>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>> String>>conforms()); >>>>>> scala.collection.immutable.Map >>>>>> scalaktopicOffsetMap= >>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>> Long>>conforms()); >>>>>> >>>>>> scala.Function1, byte[][]> handler >>>>>> = new Function, byte[][]>() { >>>>>> ..}); >>>>>> JavaDStream directKafkaStream = new >>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>> ,scalaktopicOffsetMap, >>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>> >>>>>> >>>>>> >>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream >>>>>> ? And how to use Function instead of Function1 ? >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>>>>> wrote: >>>>>> >>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>> certain >>>>>>> conditions. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Cody >>>>>>>> >>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>> any message in next few runs? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> -- Forwarded message -- >>>>>>>> From: Shushant Arora >>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>> anything) >>>>>>>> To: user >>>>>>>> >>>>>>>> >>>>>>>> I Can't make my stream application batch interval to change at run >>>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>> >>>>>>>> My requirement is to process the events and post them to some >>>>>>>> external server and if external server is down I want to increase the >>>>>>>> batch >>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>> messages >>>>>>>> in say next 5 successive runs ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
@Override >>>>> public Option>>>> byte[][]>> compute( >>>>> Time validTime) { >>>>> int processe=processedCounter.value(); >>>>> int failed = failedProcessingsCounter.value(); >>>>> if((processed==failed)){ >>>>> System.out.println("backing off since its 100 % failure"); >>>>> return Option.empty(); >>>>> }else{ >>>>> System.out.println("starting the stream "); >>>>> >>>>> return super.compute(validTime); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> To create this stream >>>>> I am using >>>>> scala.collection.immutable.Map scalakafkaParams = >>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>> String>>conforms()); >>>>> scala.collection.immutable.Map >>>>> scalaktopicOffsetMap= >>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>> Long>>conforms()); >>>>> >>>>> scala.Function1, byte[][]> handler >>>>> = new Function, byte[][]>() { >>>>> ..}); >>>>> JavaDStream directKafkaStream = new >>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>> >>>>> >>>>> >>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >>>>> And how to use Function instead of Function1 ? >>>>> >>>>> >>>>> >>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>>>> wrote: >>>>> >>>>>> I'm not aware of an existing api per se, but you could create your >>>>>> own subclass of the DStream that returns None for compute() under certain >>>>>> conditions. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Hi Cody >>>>>>> >>>>>>> Can you help here if streaming 1.3 has any api for not consuming any >>>>>>> message in next few runs? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> -- Forwarded message -- >>>>>>> From: Shushant Arora >>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>>>>> To: user >>>>>>> >>>>>>> >>>>>>> I Can't make my stream application batch interval to change at run >>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>> >>>>>>> My requirement is to process the events and post them to some >>>>>>> external server and if external server is down I want to increase the >>>>>>> batch >>>>>>> time - that is not possible but can I make it not to consume any >>>>>>> messages >>>>>>> in say next 5 successive runs ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
pAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>> String>>conforms()); >>>> scala.collection.immutable.Map >>>> scalaktopicOffsetMap= >>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>> Long>>conforms()); >>>> >>>> scala.Function1, byte[][]> handler = >>>> new Function, byte[][]>() { >>>> ..}); >>>> JavaDStream directKafkaStream = new >>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>> >>>> >>>> >>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >>>> And how to use Function instead of Function1 ? >>>> >>>> >>>> >>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>>> wrote: >>>> >>>>> I'm not aware of an existing api per se, but you could create your own >>>>> subclass of the DStream that returns None for compute() under certain >>>>> conditions. >>>>> >>>>> >>>>> >>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> Hi Cody >>>>>> >>>>>> Can you help here if streaming 1.3 has any api for not consuming any >>>>>> message in next few runs? >>>>>> >>>>>> Thanks >>>>>> >>>>>> -- Forwarded message -- >>>>>> From: Shushant Arora >>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>>>> To: user >>>>>> >>>>>> >>>>>> I Can't make my stream application batch interval to change at run >>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>> >>>>>> My requirement is to process the events and post them to some >>>>>> external server and if external server is down I want to increase the >>>>>> batch >>>>>> time - that is not possible but can I make it not to consume any messages >>>>>> in say next 5 successive runs ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora wrote: > Getting compilation error while overriding compute method of > DirectKafkaInputDStream. > > > [ERROR] CustomDirectKafkaInputDstream.java:[51,83] > compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream > cannot override compute(org.apache.spark.streaming.Time) in > org.apache.spark.streaming.dstream.DStream; attempting to use incompatible > return type > > [ERROR] found : > scala.Option> > > [ERROR] required: scala.Option> > > > class : > > public class CustomDirectKafkaInputDstream extends > DirectKafkaInputDStream kafka.serializer.DefaultDecoder, byte[][]>{ > > @Override > public Option byte[][]>> compute( > Time validTime) { > > int processed=processedCounter.value(); > int failed = failedProcessingsCounter.value(); > if((processed==failed)){ > System.out.println("backing off since its 100 % failure"); > return Option.empty(); > }else{ > System.out.println("starting the stream "); > > return super.compute(validTime); > } > } > } > > > What should be the return type of compute method ? super class is > returning Option byte[][]>> but its expecting > scala.Option> from derived class . Is > there something wring with code? > > On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger > wrote: > >> Look at the definitions of the java-specific >> KafkaUtils.createDirectStream methods (the ones that take a >> JavaStreamingContext) >> >> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> How to create classtag in java ?Also Constructor >>> of DirectKafkaInputDStream takes Function1 not Function but >>> kafkautils.createDirectStream allows function. >>> >>> I have below as overriden DirectKafkaInputDStream. >>> >>> >>> public class CustomDirectKafkaInputDstream extends >>> DirectKafkaInputDStream>> kafka.serializer.DefaultDecoder, byte[][]>{ >>> >>> public CustomDirectKafkaInputDstream( >>> StreamingContext ssc_, >>> Map kafkaParams, >>> Map fromOffsets, >>> Function1, byte[][]> messageHandler, >>> ClassTag evidence$1, ClassTag evidence$2, >>> ClassTag evidence$3, >>> ClassTag evidence$4, ClassTag evidence$5) { >>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>> evidence$2, >>> evidence$3, evidence$4, evidence$5); >>> } >>> @Override >>> public Option>> byte[][]>> compute( >>> Time validTime) { >>> int processe=processedCounter.value(); >>> int failed = failedProcessingsCounter.value(); >>> if((processed==failed)){ >>> System.out.println("backing off since its 100 % failure"); >>> return Option.empty(); >>> }else{ >>> System.out.println("starting the stream "); >>> >>> return super.compute(validTime); >>> } >>> } >>> >>> >>> >>> To create this stream >>> I am using >>> scala.collection.immutable.Map scalakafkaParams = >>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>> String>>conforms()); >>> scala.collection.immutable.Map >>> scalaktopicOffsetMap= >>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>> Long>>conforms()); >>> >>> scala.Function1, byte[][]> handler = >>> new Function, byte[][]>() { >>> ..}); >>> JavaDStream directKafkaStream = new >>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>> >>> >>> >>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >>> And how to use Function instead of Function1 ? >>> >>> >>> >>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>> wrote: >>> >>>> I'm not aware of an existing api per se, but you could create your own >>>> subclass of the DStream that returns None for compute() under certain >>>> conditions. >>>> >>>> >>>> >>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Hi Cody >>>>> >>>>> Can you help here if streaming 1.3 has any api for not consuming any >>>>> message in next few runs? >>>>> >>>>> Thanks >>>>> >>>>> -- Forwarded message -- >>>>> From: Shushant Arora >>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>>> To: user >>>>> >>>>> >>>>> I Can't make my stream application batch interval to change at run >>>>> time . Its always fixed and it always creates jobs at specified batch >>>>> inetval and enqueue them if earleir batch is not finished. >>>>> >>>>> My requirement is to process the events and post them to some external >>>>> server and if external server is down I want to increase the batch time - >>>>> that is not possible but can I make it not to consume any messages in say >>>>> next 5 successive runs ? >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Option> [ERROR] required: scala.Option> class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStream{ @Override public Option> compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println("backing off since its 100 % failure"); return Option.empty(); }else{ System.out.println("starting the stream "); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning Option> but its expecting scala.Option> from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger wrote: > Look at the definitions of the java-specific KafkaUtils.createDirectStream > methods (the ones that take a JavaStreamingContext) > > On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora > wrote: > >> How to create classtag in java ?Also Constructor >> of DirectKafkaInputDStream takes Function1 not Function but >> kafkautils.createDirectStream allows function. >> >> I have below as overriden DirectKafkaInputDStream. >> >> >> public class CustomDirectKafkaInputDstream extends >> DirectKafkaInputDStream> kafka.serializer.DefaultDecoder, byte[][]>{ >> >> public CustomDirectKafkaInputDstream( >> StreamingContext ssc_, >> Map kafkaParams, >> Map fromOffsets, >> Function1, byte[][]> messageHandler, >> ClassTag evidence$1, ClassTag evidence$2, >> ClassTag evidence$3, >> ClassTag evidence$4, ClassTag evidence$5) { >> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >> evidence$2, >> evidence$3, evidence$4, evidence$5); >> } >> @Override >> public Option> byte[][]>> compute( >> Time validTime) { >> int processe=processedCounter.value(); >> int failed = failedProcessingsCounter.value(); >> if((processed==failed)){ >> System.out.println("backing off since its 100 % failure"); >> return Option.empty(); >> }else{ >> System.out.println("starting the stream "); >> >> return super.compute(validTime); >> } >> } >> >> >> >> To create this stream >> I am using >> scala.collection.immutable.Map scalakafkaParams = >> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.> String>>conforms()); >> scala.collection.immutable.Map >> scalaktopicOffsetMap= >> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.> Long>>conforms()); >> >> scala.Function1, byte[][]> handler = >> new Function, byte[][]>() { >> ..}); >> JavaDStream directKafkaStream = new >> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >> kafka.serializer.DefaultDecoder.class,byte[][].class); >> >> >> >> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >> And how to use Function instead of Function1 ? >> >> >> >> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >> wrote: >> >>> I'm not aware of an existing api per se, but you could create your own >>> subclass of the DStream that returns None for compute() under certain >>> conditions. >>> >>> >>> >>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Hi Cody >>>> >>>> Can you help here if streaming 1.3 has any api for not consuming any >>>> message in next few runs? >>>> >>>> Thanks >>>> >>>> -- Forwarded message -- >>>> From: Shushant Arora >>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>> To: user >>>> >>>> >>>> I Can't make my stream application batch interval to change at run time >>>> . Its always fixed and it always creates jobs at specified batch inetval >>>> and enqueue them if earleir batch is not finished. >>>> >>>> My requirement is to process the events and post them to some external >>>> server and if external server is down I want to increase the batch time - >>>> that is not possible but can I make it not to consume any messages in say >>>> next 5 successive runs ? >>>> >>>> >>>> >>>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora wrote: > How to create classtag in java ?Also Constructor > of DirectKafkaInputDStream takes Function1 not Function but > kafkautils.createDirectStream allows function. > > I have below as overriden DirectKafkaInputDStream. > > > public class CustomDirectKafkaInputDstream extends > DirectKafkaInputDStream kafka.serializer.DefaultDecoder, byte[][]>{ > > public CustomDirectKafkaInputDstream( > StreamingContext ssc_, > Map kafkaParams, > Map fromOffsets, > Function1, byte[][]> messageHandler, > ClassTag evidence$1, ClassTag evidence$2, > ClassTag evidence$3, > ClassTag evidence$4, ClassTag evidence$5) { > super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, > evidence$2, > evidence$3, evidence$4, evidence$5); > } > @Override > public Option byte[][]>> compute( > Time validTime) { > int processe=processedCounter.value(); > int failed = failedProcessingsCounter.value(); > if((processed==failed)){ > System.out.println("backing off since its 100 % failure"); > return Option.empty(); > }else{ > System.out.println("starting the stream "); > > return super.compute(validTime); > } > } > > > > To create this stream > I am using > scala.collection.immutable.Map scalakafkaParams = > JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef. String>>conforms()); > scala.collection.immutable.Map > scalaktopicOffsetMap= > JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef. Long>>conforms()); > > scala.Function1, byte[][]> handler = > new Function, byte[][]>() { > ..}); > JavaDStream directKafkaStream = new > CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, > handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, > kafka.serializer.DefaultDecoder.class,byte[][].class); > > > > How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And > how to use Function instead of Function1 ? > > > > On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger > wrote: > >> I'm not aware of an existing api per se, but you could create your own >> subclass of the DStream that returns None for compute() under certain >> conditions. >> >> >> >> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Hi Cody >>> >>> Can you help here if streaming 1.3 has any api for not consuming any >>> message in next few runs? >>> >>> Thanks >>> >>> -- Forwarded message -- >>> From: Shushant Arora >>> Date: Wed, Aug 12, 2015 at 11:23 PM >>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>> To: user >>> >>> >>> I Can't make my stream application batch interval to change at run time >>> . Its always fixed and it always creates jobs at specified batch inetval >>> and enqueue them if earleir batch is not finished. >>> >>> My requirement is to process the events and post them to some external >>> server and if external server is down I want to increase the batch time - >>> that is not possible but can I make it not to consume any messages in say >>> next 5 successive runs ? >>> >>> >>> >>> >> >
Re: spark streaming 1.3 doubts(force it to not consume anything)
How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStream{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, Map kafkaParams, Map fromOffsets, Function1, byte[][]> messageHandler, ClassTag evidence$1, ClassTag evidence$2, ClassTag evidence$3, ClassTag evidence$4, ClassTag evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public Option> compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println("backing off since its 100 % failure"); return Option.empty(); }else{ System.out.println("starting the stream "); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.Map scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>conforms()); scala.collection.immutable.Map scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>conforms()); scala.Function1, byte[][]> handler = new Function, byte[][]>() { ..}); JavaDStream directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class,byte[][].class); How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And how to use Function instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger wrote: > I'm not aware of an existing api per se, but you could create your own > subclass of the DStream that returns None for compute() under certain > conditions. > > > > On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora > wrote: > >> Hi Cody >> >> Can you help here if streaming 1.3 has any api for not consuming any >> message in next few runs? >> >> Thanks >> >> ------ Forwarded message ---------- >> From: Shushant Arora >> Date: Wed, Aug 12, 2015 at 11:23 PM >> Subject: spark streaming 1.3 doubts(force it to not consume anything) >> To: user >> >> >> I Can't make my stream application batch interval to change at run time . >> Its always fixed and it always creates jobs at specified batch inetval and >> enqueue them if earleir batch is not finished. >> >> My requirement is to process the events and post them to some external >> server and if external server is down I want to increase the batch time - >> that is not possible but can I make it not to consume any messages in say >> next 5 successive runs ? >> >> >> >> >