Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <shushantaror...@gmail.com> wrote:
> Is it that in scala its allowed for derived class to have any return type ? > > And streaming jar is originally created in scala so its allowed for > DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] > compute method ? > > On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> looking at source code of >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream >> >> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = >> { >> val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) >> val rdd = KafkaRDD[K, V, U, T, R]( >> context.sparkContext, kafkaParams, currentOffsets, untilOffsets, >> messageHandler) >> >> currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) >> Some(rdd) >> } >> >> >> But in DStream its def compute (validTime: Time): Option[RDD[T]] , >> >> So what should be the return type of custom DStream extends >> DirectKafkaInputDStream . >> Since I want the behaviour to be same as of DirectKafkaInputDStream in >> normal scenarios and return none in specific scenario. >> >> And why the same error did not come while extending >> DirectKafkaInputDStream from InputDStream ? Since new return type >> Option[KafkaRDD[K, >> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been >> failed? >> >> >> >> >> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> The superclass method in DStream is defined as returning an >>> Option[RDD[T]] >>> >>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Getting compilation error while overriding compute method of >>>> DirectKafkaInputDStream. >>>> >>>> >>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream >>>> cannot override compute(org.apache.spark.streaming.Time) in >>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible >>>> return type >>>> >>>> [ERROR] found : >>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>>> >>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>>> >>>> >>>> class : >>>> >>>> public class CustomDirectKafkaInputDstream extends >>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>> >>>> @Override >>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>> byte[][]>> compute( >>>> Time validTime) { >>>> >>>> int processed=processedCounter.value(); >>>> int failed = failedProcessingsCounter.value(); >>>> if((processed==failed)){ >>>> System.out.println("backing off since its 100 % failure"); >>>> return Option.empty(); >>>> }else{ >>>> System.out.println("starting the stream "); >>>> >>>> return super.compute(validTime); >>>> } >>>> } >>>> } >>>> >>>> >>>> What should be the return type of compute method ? super class is >>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>> byte[][]>> but its expecting >>>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . Is >>>> there something wring with code? >>>> >>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Look at the definitions of the java-specific >>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>> JavaStreamingContext) >>>>> >>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> How to create classtag in java ?Also Constructor >>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>> kafkautils.createDirectStream allows function. >>>>>> >>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>> >>>>>> >>>>>> public class CustomDirectKafkaInputDstream extends >>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>> >>>>>> public CustomDirectKafkaInputDstream( >>>>>> StreamingContext ssc_, >>>>>> Map<String, String> kafkaParams, >>>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>> messageHandler, >>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>>> ClassTag<DefaultDecoder> evidence$3, >>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { >>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>> evidence$2, >>>>>> evidence$3, evidence$4, evidence$5); >>>>>> } >>>>>> @Override >>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>> DefaultDecoder, byte[][]>> compute( >>>>>> Time validTime) { >>>>>> int processe=processedCounter.value(); >>>>>> int failed = failedProcessingsCounter.value(); >>>>>> if((processed==failed)){ >>>>>> System.out.println("backing off since its 100 % failure"); >>>>>> return Option.empty(); >>>>>> }else{ >>>>>> System.out.println("starting the stream "); >>>>>> >>>>>> return super.compute(validTime); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> To create this stream >>>>>> I am using >>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>>> String>>conforms()); >>>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>>> scalaktopicOffsetMap= >>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>>> Long>>conforms()); >>>>>> >>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler >>>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { >>>>>> ..}); >>>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>> ,scalaktopicOffsetMap, >>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>> >>>>>> >>>>>> >>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream >>>>>> ? And how to use Function instead of Function1 ? >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> >>>>>>> I'm not aware of an existing api per se, but you could create your >>>>>>> own subclass of the DStream that returns None for compute() under >>>>>>> certain >>>>>>> conditions. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Cody >>>>>>>> >>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>> any message in next few runs? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> ---------- Forwarded message ---------- >>>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>> anything) >>>>>>>> To: user <user@spark.apache.org> >>>>>>>> >>>>>>>> >>>>>>>> I Can't make my stream application batch interval to change at run >>>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>> >>>>>>>> My requirement is to process the events and post them to some >>>>>>>> external server and if external server is down I want to increase the >>>>>>>> batch >>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>> messages >>>>>>>> in say next 5 successive runs ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >