Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Shushant Arora
;>> scala.Option>
>>>>>>>>
>>>>>>>> [ERROR] required: scala.Option>
>>>>>>>>
>>>>>>>>
>>>>>>>> class :
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public Option>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>>
>>>>>>>> int processed=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> What should be the return type of compute method ? super class is
>>>>>>>> returning Option>>>>>>> DefaultDecoder,
>>>>>>>> byte[][]>>  but its expecting
>>>>>>>>  scala.Option> from derived  class 
>>>>>>>> . Is
>>>>>>>> there something wring with code?
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger >>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Look at the definitions of the java-specific
>>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>>>> JavaStreamingContext)
>>>>>>>>>
>>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>>
>>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>>> DirectKafkaInputDStream>>>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>>
>>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>>> StreamingContext ssc_,
>>>>>>>>>> Map kafkaParams,
>>>>>>>>>> Map fromOffsets,
>>>>>>>>>> Function1, byte[][]>
>>>>>>>>>> messageHandler,
>>>>>>>>>> ClassTag evidence$1, ClassTag evidence$2,
>>>>>>>>>> ClassTag evidence$3,
>>>>>>>>>> ClassTag evidence$4, ClassTag
>>>>>>>>>> evidence$5) {
>>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>>>> evidence$2,
>>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>>> }
>>>>>>>>>> @Override
>>>>>>>>>> public Option>>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>>> Time validTime) {
>>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>>> if((processed==failed)){
>>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>>> return Option.empty();
>>>>>>>>>> }else{
>>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>>
>>>>>>>>>> return super.compute(validTime);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> To create this stream
>>>>>>>>>> I am using
>>>>>>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>>>> String>>conforms());
>>>>>>>>>> scala.collection.immutable.Map
>>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>>>> Long>>conforms());
>>>>>>>>>>
>>>>>>>>>> scala.Function1, byte[][]>
>>>>>>>>>> handler = new Function, 
>>>>>>>>>> byte[][]>() {
>>>>>>>>>> ..});
>>>>>>>>>> JavaDStream directKafkaStream = new
>>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>>>>> ,scalaktopicOffsetMap,
>>>>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>>>> Function1 ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm not aware of an existing api per se, but you could create
>>>>>>>>>>> your own subclass of the DStream that returns None for compute() 
>>>>>>>>>>> under
>>>>>>>>>>> certain conditions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Cody
>>>>>>>>>>>>
>>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not
>>>>>>>>>>>> consuming any message in next few runs?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> -- Forwarded message --
>>>>>>>>>>>> From: Shushant Arora 
>>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>>>> anything)
>>>>>>>>>>>> To: user 
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>>>> run time . Its always fixed and it always creates jobs at 
>>>>>>>>>>>> specified batch
>>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>>>
>>>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>>>> external server and if external server is down I want to increase 
>>>>>>>>>>>> the batch
>>>>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>>>>> messages
>>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Cody Koeninger
gt;>>>>>> public Option>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>>
>>>>>>> int processed=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> What should be the return type of compute method ? super class is
>>>>>>> returning Option>>>>>> DefaultDecoder,
>>>>>>> byte[][]>>  but its expecting
>>>>>>>  scala.Option> from derived  class . 
>>>>>>> Is
>>>>>>> there something wring with code?
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Look at the definitions of the java-specific
>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>>> JavaStreamingContext)
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>
>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>> DirectKafkaInputDStream>>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>
>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>> StreamingContext ssc_,
>>>>>>>>> Map kafkaParams,
>>>>>>>>> Map fromOffsets,
>>>>>>>>> Function1, byte[][]>
>>>>>>>>> messageHandler,
>>>>>>>>> ClassTag evidence$1, ClassTag evidence$2,
>>>>>>>>> ClassTag evidence$3,
>>>>>>>>> ClassTag evidence$4, ClassTag
>>>>>>>>> evidence$5) {
>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>>> evidence$2,
>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>> }
>>>>>>>>> @Override
>>>>>>>>> public Option>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>> Time validTime) {
>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>> if((processed==failed)){
>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>> return Option.empty();
>>>>>>>>> }else{
>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>
>>>>>>>>> return super.compute(validTime);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> To create this stream
>>>>>>>>> I am using
>>>>>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>>> String>>conforms());
>>>>>>>>> scala.collection.immutable.Map
>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>>> Long>>conforms());
>>>>>>>>>
>>>>>>>>> scala.Function1, byte[][]>
>>>>>>>>> handler = new Function, 
>>>>>>>>> byte[][]>() {
>>>>>>>>> ..});
>>>>>>>>> JavaDStream directKafkaStream = new
>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>>>> ,scalaktopicOffsetMap,
>>>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>>> Function1 ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> I'm not aware of an existing api per se, but you could create
>>>>>>>>>> your own subclass of the DStream that returns None for compute() 
>>>>>>>>>> under
>>>>>>>>>> certain conditions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Cody
>>>>>>>>>>>
>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>>> any message in next few runs?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> -- Forwarded message --
>>>>>>>>>>> From: Shushant Arora 
>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>>> anything)
>>>>>>>>>>> To: user 
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified 
>>>>>>>>>>> batch
>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>>
>>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>>> external server and if external server is down I want to increase 
>>>>>>>>>>> the batch
>>>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>>>> messages
>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Shushant Arora
;> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> What should be the return type of compute method ? super class is
>>>>>> returning Option>>>>> byte[][]>>  but its expecting
>>>>>>  scala.Option> from derived  class . 
>>>>>> Is
>>>>>> there something wring with code?
>>>>>>
>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger 
>>>>>> wrote:
>>>>>>
>>>>>>> Look at the definitions of the java-specific
>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>> JavaStreamingContext)
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>
>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>
>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>> StreamingContext ssc_,
>>>>>>>> Map kafkaParams,
>>>>>>>> Map fromOffsets,
>>>>>>>> Function1, byte[][]>
>>>>>>>> messageHandler,
>>>>>>>> ClassTag evidence$1, ClassTag evidence$2,
>>>>>>>> ClassTag evidence$3,
>>>>>>>> ClassTag evidence$4, ClassTag evidence$5)
>>>>>>>> {
>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>> evidence$2,
>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>> }
>>>>>>>> @Override
>>>>>>>> public Option>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>> int processe=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> To create this stream
>>>>>>>> I am using
>>>>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>>> String>>conforms());
>>>>>>>> scala.collection.immutable.Map
>>>>>>>> scalaktopicOffsetMap=
>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>>> Long>>conforms());
>>>>>>>>
>>>>>>>> scala.Function1, byte[][]>
>>>>>>>> handler = new Function, byte[][]>() 
>>>>>>>> {
>>>>>>>> ..});
>>>>>>>> JavaDStream directKafkaStream = new
>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>>> ,scalaktopicOffsetMap,
>>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> How to pass classTag to constructor in
>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>> Function1 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>>>> certain
>>>>>>>>> conditions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody
>>>>>>>>>>
>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>> any message in next few runs?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> -- Forwarded message --
>>>>>>>>>> From: Shushant Arora 
>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>> anything)
>>>>>>>>>> To: user 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified 
>>>>>>>>>> batch
>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>
>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>> external server and if external server is down I want to increase 
>>>>>>>>>> the batch
>>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>>> messages
>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
3 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>
>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>
>>>>>>>
>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>> DirectKafkaInputDStream>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>
>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>> StreamingContext ssc_,
>>>>>>> Map kafkaParams,
>>>>>>> Map fromOffsets,
>>>>>>> Function1, byte[][]>
>>>>>>> messageHandler,
>>>>>>> ClassTag evidence$1, ClassTag evidence$2,
>>>>>>> ClassTag evidence$3,
>>>>>>> ClassTag evidence$4, ClassTag evidence$5) {
>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>> evidence$2,
>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>> }
>>>>>>> @Override
>>>>>>> public Option>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>> int processe=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> To create this stream
>>>>>>> I am using
>>>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>>> String>>conforms());
>>>>>>> scala.collection.immutable.Map
>>>>>>> scalaktopicOffsetMap=
>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>>> Long>>conforms());
>>>>>>>
>>>>>>> scala.Function1, byte[][]>
>>>>>>> handler = new Function, byte[][]>() {
>>>>>>> ..});
>>>>>>> JavaDStream directKafkaStream = new
>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>> ,scalaktopicOffsetMap,
>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger >>>>>> > wrote:
>>>>>>>
>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>>> certain
>>>>>>>> conditions.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody
>>>>>>>>>
>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>> any message in next few runs?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> -- Forwarded message --
>>>>>>>>> From: Shushant Arora 
>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>> anything)
>>>>>>>>> To: user 
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>
>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>> external server and if external server is down I want to increase the 
>>>>>>>>> batch
>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>> messages
>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
omDirectKafkaInputDstream(
>>>>>> StreamingContext ssc_,
>>>>>> Map kafkaParams,
>>>>>> Map fromOffsets,
>>>>>> Function1, byte[][]>
>>>>>> messageHandler,
>>>>>> ClassTag evidence$1, ClassTag evidence$2,
>>>>>> ClassTag evidence$3,
>>>>>> ClassTag evidence$4, ClassTag evidence$5) {
>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>> evidence$2,
>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>> }
>>>>>> @Override
>>>>>> public Option>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>> Time validTime) {
>>>>>> int processe=processedCounter.value();
>>>>>> int failed = failedProcessingsCounter.value();
>>>>>> if((processed==failed)){
>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>> return Option.empty();
>>>>>> }else{
>>>>>> System.out.println("starting the stream ");
>>>>>>
>>>>>> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> To create this stream
>>>>>> I am using
>>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>>> String>>conforms());
>>>>>> scala.collection.immutable.Map
>>>>>> scalaktopicOffsetMap=
>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>>> Long>>conforms());
>>>>>>
>>>>>> scala.Function1, byte[][]> handler
>>>>>> = new Function, byte[][]>() {
>>>>>> ..});
>>>>>> JavaDStream directKafkaStream = new
>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>> ,scalaktopicOffsetMap,
>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>
>>>>>>
>>>>>>
>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
>>>>>> wrote:
>>>>>>
>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>> certain
>>>>>>> conditions.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Cody
>>>>>>>>
>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>> any message in next few runs?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> -- Forwarded message --
>>>>>>>> From: Shushant Arora 
>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>> anything)
>>>>>>>> To: user 
>>>>>>>>
>>>>>>>>
>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>
>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>> external server and if external server is down I want to increase the 
>>>>>>>> batch
>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>> messages
>>>>>>>> in say next 5 successive runs ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
@Override
>>>>> public Option>>>> byte[][]>> compute(
>>>>> Time validTime) {
>>>>> int processe=processedCounter.value();
>>>>> int failed = failedProcessingsCounter.value();
>>>>> if((processed==failed)){
>>>>> System.out.println("backing off since its 100 % failure");
>>>>> return Option.empty();
>>>>> }else{
>>>>> System.out.println("starting the stream ");
>>>>>
>>>>> return super.compute(validTime);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> To create this stream
>>>>> I am using
>>>>> scala.collection.immutable.Map scalakafkaParams =
>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>>> String>>conforms());
>>>>> scala.collection.immutable.Map
>>>>> scalaktopicOffsetMap=
>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>>> Long>>conforms());
>>>>>
>>>>> scala.Function1, byte[][]> handler
>>>>> = new Function, byte[][]>() {
>>>>> ..});
>>>>> JavaDStream directKafkaStream = new
>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>
>>>>>
>>>>>
>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>>> And how to use Function instead of Function1 ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
>>>>> wrote:
>>>>>
>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>> conditions.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody
>>>>>>>
>>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>>> message in next few runs?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> -- Forwarded message --
>>>>>>> From: Shushant Arora 
>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>>> To: user 
>>>>>>>
>>>>>>>
>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>
>>>>>>> My requirement is to process the events and post them to some
>>>>>>> external server and if external server is down I want to increase the 
>>>>>>> batch
>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>> messages
>>>>>>> in say next 5 successive runs ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
pAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>>> String>>conforms());
>>>> scala.collection.immutable.Map
>>>> scalaktopicOffsetMap=
>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>>> Long>>conforms());
>>>>
>>>> scala.Function1, byte[][]> handler =
>>>> new Function, byte[][]>() {
>>>>     ..});
>>>> JavaDStream directKafkaStream = new
>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>
>>>>
>>>>
>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>> And how to use Function instead of Function1 ?
>>>>
>>>>
>>>>
>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
>>>> wrote:
>>>>
>>>>> I'm not aware of an existing api per se, but you could create your own
>>>>> subclass of the DStream that returns None for compute() under certain
>>>>> conditions.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody
>>>>>>
>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>> message in next few runs?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> -- Forwarded message --
>>>>>> From: Shushant Arora 
>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>> To: user 
>>>>>>
>>>>>>
>>>>>> I Can't make my stream application batch interval to change at run
>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>
>>>>>> My requirement is to process the events and post them to some
>>>>>> external server and if external server is down I want to increase the 
>>>>>> batch
>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>> in say next 5 successive runs ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
The superclass method in DStream is defined as returning an Option[RDD[T]]

On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
wrote:

> Getting compilation error while overriding compute method of
> DirectKafkaInputDStream.
>
>
> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
> cannot override compute(org.apache.spark.streaming.Time) in
> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
> return type
>
> [ERROR] found   :
> scala.Option>
>
> [ERROR] required: scala.Option>
>
>
> class :
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream kafka.serializer.DefaultDecoder, byte[][]>{
>
> @Override
> public Option byte[][]>> compute(
> Time validTime) {
>
> int processed=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
> }
>
>
> What should be the return type of compute method ? super class is
> returning Option byte[][]>>  but its expecting
>  scala.Option> from derived  class . Is
> there something wring with code?
>
> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger 
> wrote:
>
>> Look at the definitions of the java-specific
>> KafkaUtils.createDirectStream methods (the ones that take a
>> JavaStreamingContext)
>>
>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> How to create classtag in java ?Also Constructor
>>> of DirectKafkaInputDStream takes Function1 not Function but
>>> kafkautils.createDirectStream allows function.
>>>
>>> I have below as overriden DirectKafkaInputDStream.
>>>
>>>
>>> public class CustomDirectKafkaInputDstream extends
>>> DirectKafkaInputDStream>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>
>>> public CustomDirectKafkaInputDstream(
>>> StreamingContext ssc_,
>>> Map kafkaParams,
>>> Map fromOffsets,
>>> Function1, byte[][]> messageHandler,
>>> ClassTag evidence$1, ClassTag evidence$2,
>>> ClassTag evidence$3,
>>> ClassTag evidence$4, ClassTag evidence$5) {
>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>> evidence$2,
>>> evidence$3, evidence$4, evidence$5);
>>> }
>>> @Override
>>> public Option>> byte[][]>> compute(
>>> Time validTime) {
>>> int processe=processedCounter.value();
>>> int failed = failedProcessingsCounter.value();
>>> if((processed==failed)){
>>> System.out.println("backing off since its 100 % failure");
>>> return Option.empty();
>>> }else{
>>> System.out.println("starting the stream ");
>>>
>>> return super.compute(validTime);
>>> }
>>> }
>>>
>>>
>>>
>>> To create this stream
>>> I am using
>>> scala.collection.immutable.Map scalakafkaParams =
>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>> String>>conforms());
>>> scala.collection.immutable.Map
>>> scalaktopicOffsetMap=
>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>> Long>>conforms());
>>>
>>> scala.Function1, byte[][]> handler =
>>> new Function, byte[][]>() {
>>> ..});
>>> JavaDStream directKafkaStream = new
>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>
>>>
>>>
>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>> And how to use Function instead of Function1 ?
>>>
>>>
>>>
>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
>>> wrote:
>>>
>>>> I'm not aware of an existing api per se, but you could create your own
>>>> subclass of the DStream that returns None for compute() under certain
>>>> conditions.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Hi Cody
>>>>>
>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>> message in next few runs?
>>>>>
>>>>> Thanks
>>>>>
>>>>> -- Forwarded message --
>>>>> From: Shushant Arora 
>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>> To: user 
>>>>>
>>>>>
>>>>> I Can't make my stream application batch interval to change at run
>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>
>>>>> My requirement is to process the events and post them to some external
>>>>> server and if external server is down I want to increase the batch time -
>>>>> that is not possible but can I make it not to consume any messages in say
>>>>> next 5 successive runs ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
Getting compilation error while overriding compute method of
DirectKafkaInputDStream.


[ERROR] CustomDirectKafkaInputDstream.java:[51,83]
compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
cannot override compute(org.apache.spark.streaming.Time) in
org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
return type

[ERROR] found   :
scala.Option>

[ERROR] required: scala.Option>


class :

public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream{

@Override
public Option> compute(
Time validTime) {

int processed=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}
}


What should be the return type of compute method ? super class is returning
Option>
 but its expecting  scala.Option> from
derived  class . Is there something wring with code?

On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger  wrote:

> Look at the definitions of the java-specific KafkaUtils.createDirectStream
> methods (the ones that take a JavaStreamingContext)
>
> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora  > wrote:
>
>> How to create classtag in java ?Also Constructor
>> of DirectKafkaInputDStream takes Function1 not Function but
>> kafkautils.createDirectStream allows function.
>>
>> I have below as overriden DirectKafkaInputDStream.
>>
>>
>> public class CustomDirectKafkaInputDstream extends
>> DirectKafkaInputDStream> kafka.serializer.DefaultDecoder, byte[][]>{
>>
>> public CustomDirectKafkaInputDstream(
>> StreamingContext ssc_,
>> Map kafkaParams,
>> Map fromOffsets,
>> Function1, byte[][]> messageHandler,
>> ClassTag evidence$1, ClassTag evidence$2,
>> ClassTag evidence$3,
>> ClassTag evidence$4, ClassTag evidence$5) {
>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>> evidence$2,
>> evidence$3, evidence$4, evidence$5);
>> }
>> @Override
>> public Option> byte[][]>> compute(
>> Time validTime) {
>> int processe=processedCounter.value();
>> int failed = failedProcessingsCounter.value();
>> if((processed==failed)){
>> System.out.println("backing off since its 100 % failure");
>> return Option.empty();
>> }else{
>> System.out.println("starting the stream ");
>>
>> return super.compute(validTime);
>> }
>> }
>>
>>
>>
>> To create this stream
>> I am using
>> scala.collection.immutable.Map scalakafkaParams =
>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.> String>>conforms());
>> scala.collection.immutable.Map
>> scalaktopicOffsetMap=
>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.> Long>>conforms());
>>
>> scala.Function1, byte[][]> handler =
>> new Function, byte[][]>() {
>> ..});
>> JavaDStream directKafkaStream = new
>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>
>>
>>
>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>> And how to use Function instead of Function1 ?
>>
>>
>>
>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
>> wrote:
>>
>>> I'm not aware of an existing api per se, but you could create your own
>>> subclass of the DStream that returns None for compute() under certain
>>> conditions.
>>>
>>>
>>>
>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Hi Cody
>>>>
>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>> message in next few runs?
>>>>
>>>> Thanks
>>>>
>>>> -- Forwarded message --
>>>> From: Shushant Arora 
>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>> To: user 
>>>>
>>>>
>>>> I Can't make my stream application batch interval to change at run time
>>>> . Its always fixed and it always creates jobs at specified batch inetval
>>>> and enqueue them if earleir batch is not finished.
>>>>
>>>> My requirement is to process the events and post them to some external
>>>> server and if external server is down I want to increase the batch time -
>>>> that is not possible but can I make it not to consume any messages in say
>>>> next 5 successive runs ?
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Cody Koeninger
Look at the definitions of the java-specific KafkaUtils.createDirectStream
methods (the ones that take a JavaStreamingContext)

On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
wrote:

> How to create classtag in java ?Also Constructor
> of DirectKafkaInputDStream takes Function1 not Function but
> kafkautils.createDirectStream allows function.
>
> I have below as overriden DirectKafkaInputDStream.
>
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream kafka.serializer.DefaultDecoder, byte[][]>{
>
> public CustomDirectKafkaInputDstream(
> StreamingContext ssc_,
> Map kafkaParams,
> Map fromOffsets,
> Function1, byte[][]> messageHandler,
> ClassTag evidence$1, ClassTag evidence$2,
> ClassTag evidence$3,
> ClassTag evidence$4, ClassTag evidence$5) {
> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
> evidence$2,
> evidence$3, evidence$4, evidence$5);
> }
> @Override
> public Option byte[][]>> compute(
> Time validTime) {
> int processe=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
>
>
>
> To create this stream
> I am using
> scala.collection.immutable.Map scalakafkaParams =
> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef. String>>conforms());
> scala.collection.immutable.Map
> scalaktopicOffsetMap=
> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef. Long>>conforms());
>
> scala.Function1, byte[][]> handler =
> new Function, byte[][]>() {
> ..});
> JavaDStream directKafkaStream = new
> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
> kafka.serializer.DefaultDecoder.class,byte[][].class);
>
>
>
> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
> how to use Function instead of Function1 ?
>
>
>
> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger 
> wrote:
>
>> I'm not aware of an existing api per se, but you could create your own
>> subclass of the DStream that returns None for compute() under certain
>> conditions.
>>
>>
>>
>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi Cody
>>>
>>> Can you help here if streaming 1.3 has any api for not consuming any
>>> message in next few runs?
>>>
>>> Thanks
>>>
>>> -- Forwarded message --
>>> From: Shushant Arora 
>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>> To: user 
>>>
>>>
>>> I Can't make my stream application batch interval to change at run time
>>> . Its always fixed and it always creates jobs at specified batch inetval
>>> and enqueue them if earleir batch is not finished.
>>>
>>> My requirement is to process the events and post them to some external
>>> server and if external server is down I want to increase the batch time -
>>> that is not possible but can I make it not to consume any messages in say
>>> next 5 successive runs ?
>>>
>>>
>>>
>>>
>>
>


Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Shushant Arora
How to create classtag in java ?Also Constructor of DirectKafkaInputDStream
takes Function1 not Function but kafkautils.createDirectStream allows
function.

I have below as overriden DirectKafkaInputDStream.


public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream{

public CustomDirectKafkaInputDstream(
StreamingContext ssc_,
Map kafkaParams,
Map fromOffsets,
Function1, byte[][]> messageHandler,
ClassTag evidence$1, ClassTag evidence$2,
ClassTag evidence$3,
ClassTag evidence$4, ClassTag evidence$5) {
super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
evidence$2,
evidence$3, evidence$4, evidence$5);
}
@Override
public Option> compute(
Time validTime) {
int processe=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}



To create this stream
I am using
scala.collection.immutable.Map scalakafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.>conforms());
scala.collection.immutable.Map
scalaktopicOffsetMap=
JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.>conforms());

scala.Function1, byte[][]> handler = new
Function, byte[][]>() {
..});
JavaDStream directKafkaStream = new
CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
kafka.serializer.DefaultDecoder.class,byte[][].class);



How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
how to use Function instead of Function1 ?



On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger  wrote:

> I'm not aware of an existing api per se, but you could create your own
> subclass of the DStream that returns None for compute() under certain
> conditions.
>
>
>
> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora  > wrote:
>
>> Hi Cody
>>
>> Can you help here if streaming 1.3 has any api for not consuming any
>> message in next few runs?
>>
>> Thanks
>>
>> ------ Forwarded message ----------
>> From: Shushant Arora 
>> Date: Wed, Aug 12, 2015 at 11:23 PM
>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>> To: user 
>>
>>
>> I Can't make my stream application batch interval to change at run time .
>> Its always fixed and it always creates jobs at specified batch inetval and
>> enqueue them if earleir batch is not finished.
>>
>> My requirement is to process the events and post them to some external
>> server and if external server is down I want to increase the batch time -
>> that is not possible but can I make it not to consume any messages in say
>> next 5 successive runs ?
>>
>>
>>
>>
>