To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but Option<KafkaRDD[K, V, U, T, R] > is not subclass of Option<RDD[R]>;
In scala C[T’] is a subclass of C[T] as per https://twitter.github.io/scala_school/type-basics.html but this is not allowed in java. So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream ? On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic > inheritance is not supported so derived class cannot return different > genric typed subclass from overriden method. > > On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Option is covariant and KafkaRDD is a subclass of RDD >> >> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Is it that in scala its allowed for derived class to have any return >>> type ? >>> >>> And streaming jar is originally created in scala so its allowed for >>> DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] >>> compute method ? >>> >>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> looking at source code of >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream >>>> >>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] >>>> = { >>>> val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) >>>> val rdd = KafkaRDD[K, V, U, T, R]( >>>> context.sparkContext, kafkaParams, currentOffsets, untilOffsets, >>>> messageHandler) >>>> >>>> currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) >>>> Some(rdd) >>>> } >>>> >>>> >>>> But in DStream its def compute (validTime: Time): Option[RDD[T]] , >>>> >>>> So what should be the return type of custom DStream extends >>>> DirectKafkaInputDStream . >>>> Since I want the behaviour to be same as of DirectKafkaInputDStream in >>>> normal scenarios and return none in specific scenario. >>>> >>>> And why the same error did not come while extending >>>> DirectKafkaInputDStream from InputDStream ? Since new return type >>>> Option[KafkaRDD[K, >>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been >>>> failed? >>>> >>>> >>>> >>>> >>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> The superclass method in DStream is defined as returning an >>>>> Option[RDD[T]] >>>>> >>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> Getting compilation error while overriding compute method of >>>>>> DirectKafkaInputDStream. >>>>>> >>>>>> >>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream >>>>>> cannot override compute(org.apache.spark.streaming.Time) in >>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use >>>>>> incompatible >>>>>> return type >>>>>> >>>>>> [ERROR] found : >>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>>>>> >>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>>>>> >>>>>> >>>>>> class : >>>>>> >>>>>> public class CustomDirectKafkaInputDstream extends >>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>> >>>>>> @Override >>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>> DefaultDecoder, byte[][]>> compute( >>>>>> Time validTime) { >>>>>> >>>>>> int processed=processedCounter.value(); >>>>>> int failed = failedProcessingsCounter.value(); >>>>>> if((processed==failed)){ >>>>>> System.out.println("backing off since its 100 % failure"); >>>>>> return Option.empty(); >>>>>> }else{ >>>>>> System.out.println("starting the stream "); >>>>>> >>>>>> return super.compute(validTime); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> What should be the return type of compute method ? super class is >>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>>>> byte[][]>> but its expecting >>>>>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . >>>>>> Is >>>>>> there something wring with code? >>>>>> >>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> >>>>>>> Look at the definitions of the java-specific >>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>> JavaStreamingContext) >>>>>>> >>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>> >>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>> >>>>>>>> >>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>> DirectKafkaInputDStream<byte[], byte[], >>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>> >>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>> StreamingContext ssc_, >>>>>>>> Map<String, String> kafkaParams, >>>>>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>> messageHandler, >>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>>>>> ClassTag<DefaultDecoder> evidence$3, >>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) >>>>>>>> { >>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>> evidence$2, >>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>> } >>>>>>>> @Override >>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>> Time validTime) { >>>>>>>> int processe=processedCounter.value(); >>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>> if((processed==failed)){ >>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>> return Option.empty(); >>>>>>>> }else{ >>>>>>>> System.out.println("starting the stream "); >>>>>>>> >>>>>>>> return super.compute(validTime); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> To create this stream >>>>>>>> I am using >>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>>>>> String>>conforms()); >>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>>>>> scalaktopicOffsetMap= >>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>>>>> Long>>conforms()); >>>>>>>> >>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() >>>>>>>> { >>>>>>>> ..}); >>>>>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>> ,scalaktopicOffsetMap, >>>>>>>> handler,byte[].class,byte[].class, >>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> How to pass classTag to constructor in >>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>> Function1 ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>> c...@koeninger.org> wrote: >>>>>>>> >>>>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>>>> certain >>>>>>>>> conditions. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Cody >>>>>>>>>> >>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>>> any message in next few runs? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>> anything) >>>>>>>>>> To: user <user@spark.apache.org> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>> run time . Its always fixed and it always creates jobs at specified >>>>>>>>>> batch >>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>> >>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>> the batch >>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>> messages >>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >