The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <shushantaror...@gmail.com> wrote:
> Getting compilation error while overriding compute method of > DirectKafkaInputDStream. > > > [ERROR] CustomDirectKafkaInputDstream.java:[51,83] > compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream > cannot override compute(org.apache.spark.streaming.Time) in > org.apache.spark.streaming.dstream.DStream; attempting to use incompatible > return type > > [ERROR] found : > scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> > > [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> > > > class : > > public class CustomDirectKafkaInputDstream extends > DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, > kafka.serializer.DefaultDecoder, byte[][]>{ > > @Override > public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, > byte[][]>> compute( > Time validTime) { > > int processed=processedCounter.value(); > int failed = failedProcessingsCounter.value(); > if((processed==failed)){ > System.out.println("backing off since its 100 % failure"); > return Option.empty(); > }else{ > System.out.println("starting the stream "); > > return super.compute(validTime); > } > } > } > > > What should be the return type of compute method ? super class is > returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, > byte[][]>> but its expecting > scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . Is > there something wring with code? > > On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Look at the definitions of the java-specific >> KafkaUtils.createDirectStream methods (the ones that take a >> JavaStreamingContext) >> >> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> How to create classtag in java ?Also Constructor >>> of DirectKafkaInputDStream takes Function1 not Function but >>> kafkautils.createDirectStream allows function. >>> >>> I have below as overriden DirectKafkaInputDStream. >>> >>> >>> public class CustomDirectKafkaInputDstream extends >>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>> kafka.serializer.DefaultDecoder, byte[][]>{ >>> >>> public CustomDirectKafkaInputDstream( >>> StreamingContext ssc_, >>> Map<String, String> kafkaParams, >>> Map<TopicAndPartition, Object> fromOffsets, >>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler, >>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>> ClassTag<DefaultDecoder> evidence$3, >>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { >>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>> evidence$2, >>> evidence$3, evidence$4, evidence$5); >>> } >>> @Override >>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>> byte[][]>> compute( >>> Time validTime) { >>> int processe=processedCounter.value(); >>> int failed = failedProcessingsCounter.value(); >>> if((processed==failed)){ >>> System.out.println("backing off since its 100 % failure"); >>> return Option.empty(); >>> }else{ >>> System.out.println("starting the stream "); >>> >>> return super.compute(validTime); >>> } >>> } >>> >>> >>> >>> To create this stream >>> I am using >>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>> String>>conforms()); >>> scala.collection.immutable.Map<TopicAndPartition, Long> >>> scalaktopicOffsetMap= >>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>> Long>>conforms()); >>> >>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = >>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { >>> ..}); >>> JavaDStream<byte[][]> directKafkaStream = new >>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>> >>> >>> >>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >>> And how to use Function instead of Function1 ? >>> >>> >>> >>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> I'm not aware of an existing api per se, but you could create your own >>>> subclass of the DStream that returns None for compute() under certain >>>> conditions. >>>> >>>> >>>> >>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Hi Cody >>>>> >>>>> Can you help here if streaming 1.3 has any api for not consuming any >>>>> message in next few runs? >>>>> >>>>> Thanks >>>>> >>>>> ---------- Forwarded message ---------- >>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>>> To: user <user@spark.apache.org> >>>>> >>>>> >>>>> I Can't make my stream application batch interval to change at run >>>>> time . Its always fixed and it always creates jobs at specified batch >>>>> inetval and enqueue them if earleir batch is not finished. >>>>> >>>>> My requirement is to process the events and post them to some external >>>>> server and if external server is down I want to increase the batch time - >>>>> that is not possible but can I make it not to consume any messages in say >>>>> next 5 successive runs ? >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >