But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method.
On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> wrote: > Option is covariant and KafkaRDD is a subclass of RDD > > On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> Is it that in scala its allowed for derived class to have any return type >> ? >> >> And streaming jar is originally created in scala so its allowed for >> DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] >> compute method ? >> >> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> looking at source code of >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream >>> >>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] >>> = { >>> val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) >>> val rdd = KafkaRDD[K, V, U, T, R]( >>> context.sparkContext, kafkaParams, currentOffsets, untilOffsets, >>> messageHandler) >>> >>> currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) >>> Some(rdd) >>> } >>> >>> >>> But in DStream its def compute (validTime: Time): Option[RDD[T]] , >>> >>> So what should be the return type of custom DStream extends >>> DirectKafkaInputDStream . >>> Since I want the behaviour to be same as of DirectKafkaInputDStream in >>> normal scenarios and return none in specific scenario. >>> >>> And why the same error did not come while extending >>> DirectKafkaInputDStream from InputDStream ? Since new return type >>> Option[KafkaRDD[K, >>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been >>> failed? >>> >>> >>> >>> >>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> The superclass method in DStream is defined as returning an >>>> Option[RDD[T]] >>>> >>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Getting compilation error while overriding compute method of >>>>> DirectKafkaInputDStream. >>>>> >>>>> >>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream >>>>> cannot override compute(org.apache.spark.streaming.Time) in >>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible >>>>> return type >>>>> >>>>> [ERROR] found : >>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>>>> >>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>>>> >>>>> >>>>> class : >>>>> >>>>> public class CustomDirectKafkaInputDstream extends >>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>> >>>>> @Override >>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>>> byte[][]>> compute( >>>>> Time validTime) { >>>>> >>>>> int processed=processedCounter.value(); >>>>> int failed = failedProcessingsCounter.value(); >>>>> if((processed==failed)){ >>>>> System.out.println("backing off since its 100 % failure"); >>>>> return Option.empty(); >>>>> }else{ >>>>> System.out.println("starting the stream "); >>>>> >>>>> return super.compute(validTime); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> What should be the return type of compute method ? super class is >>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>>> byte[][]>> but its expecting >>>>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . Is >>>>> there something wring with code? >>>>> >>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Look at the definitions of the java-specific >>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>> JavaStreamingContext) >>>>>> >>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> How to create classtag in java ?Also Constructor >>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>> kafkautils.createDirectStream allows function. >>>>>>> >>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>> >>>>>>> >>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>> >>>>>>> public CustomDirectKafkaInputDstream( >>>>>>> StreamingContext ssc_, >>>>>>> Map<String, String> kafkaParams, >>>>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>> messageHandler, >>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>>>> ClassTag<DefaultDecoder> evidence$3, >>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { >>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>> evidence$2, >>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>> } >>>>>>> @Override >>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>> Time validTime) { >>>>>>> int processe=processedCounter.value(); >>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>> if((processed==failed)){ >>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>> return Option.empty(); >>>>>>> }else{ >>>>>>> System.out.println("starting the stream "); >>>>>>> >>>>>>> return super.compute(validTime); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> To create this stream >>>>>>> I am using >>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>>>> String>>conforms()); >>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>>>> scalaktopicOffsetMap= >>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>>>> Long>>conforms()); >>>>>>> >>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { >>>>>>> ..}); >>>>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>> ,scalaktopicOffsetMap, >>>>>>> handler,byte[].class,byte[].class, >>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>> >>>>>>> >>>>>>> >>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream >>>>>>> ? And how to use Function instead of Function1 ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org >>>>>>> > wrote: >>>>>>> >>>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>>> certain >>>>>>>> conditions. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Cody >>>>>>>>> >>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>> any message in next few runs? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>> anything) >>>>>>>>> To: user <user@spark.apache.org> >>>>>>>>> >>>>>>>>> >>>>>>>>> I Can't make my stream application batch interval to change at run >>>>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>> >>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>> external server and if external server is down I want to increase the >>>>>>>>> batch >>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>> messages >>>>>>>>> in say next 5 successive runs ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >