But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
inheritance is not supported so derived class cannot return  different
genric typed subclass from overriden method.

On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Option is covariant and KafkaRDD is a subclass of RDD
>
> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Is it that in scala its allowed for derived class to have any return type
>> ?
>>
>>  And streaming jar is originally created in scala so its allowed for
>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>> compute method ?
>>
>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> looking at source code of
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>
>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>> = {
>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>> messageHandler)
>>>
>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>     Some(rdd)
>>>   }
>>>
>>>
>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>
>>> So what should  be the return type of custom DStream extends
>>> DirectKafkaInputDStream .
>>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>>> normal scenarios and return none in specific scenario.
>>>
>>> And why the same error did not come while extending
>>> DirectKafkaInputDStream from InputDStream ? Since new return type 
>>> Option[KafkaRDD[K,
>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>> failed?
>>>
>>>
>>>
>>>
>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> The superclass method in DStream is defined as returning an
>>>> Option[RDD[T]]
>>>>
>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Getting compilation error while overriding compute method of
>>>>> DirectKafkaInputDStream.
>>>>>
>>>>>
>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>>> return type
>>>>>
>>>>> [ERROR] found   :
>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>
>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>
>>>>>
>>>>> class :
>>>>>
>>>>> public class CustomDirectKafkaInputDstream extends
>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>
>>>>> @Override
>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>> compute(
>>>>> Time validTime) {
>>>>>
>>>>> int processed=processedCounter.value();
>>>>> int failed = failedProcessingsCounter.value();
>>>>> if((processed==failed)){
>>>>> System.out.println("backing off since its 100 % failure");
>>>>> return Option.empty();
>>>>> }else{
>>>>> System.out.println("starting the stream ");
>>>>>
>>>>> return super.compute(validTime);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> What should be the return type of compute method ? super class is
>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>>  but its expecting
>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>>> there something wring with code?
>>>>>
>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Look at the definitions of the java-specific
>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>> JavaStreamingContext)
>>>>>>
>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>
>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>
>>>>>>>
>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>
>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>> StreamingContext ssc_,
>>>>>>> Map<String, String> kafkaParams,
>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>> messageHandler,
>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>> evidence$2,
>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>> }
>>>>>>> @Override
>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>> int processe=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> To create this stream
>>>>>>> I am using
>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>> String>>conforms());
>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>> scalaktopicOffsetMap=
>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>> Long>>conforms());
>>>>>>>
>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>>         ..});
>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>> ,scalaktopicOffsetMap,
>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>>> certain
>>>>>>>> conditions.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody
>>>>>>>>>
>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>> any message in next few runs?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>> anything)
>>>>>>>>> To: user <user@spark.apache.org>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>
>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>> external server and if external server is down I want to increase the 
>>>>>>>>> batch
>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>> messages
>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to