Option is covariant and KafkaRDD is a subclass of RDD

On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Is it that in scala its allowed for derived class to have any return type ?
>
>  And streaming jar is originally created in scala so its allowed for
> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
> compute method ?
>
> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> looking at source code of
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>
>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] =
>> {
>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>     val rdd = KafkaRDD[K, V, U, T, R](
>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>> messageHandler)
>>
>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>     Some(rdd)
>>   }
>>
>>
>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>
>> So what should  be the return type of custom DStream extends
>> DirectKafkaInputDStream .
>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>> normal scenarios and return none in specific scenario.
>>
>> And why the same error did not come while extending
>> DirectKafkaInputDStream from InputDStream ? Since new return type 
>> Option[KafkaRDD[K,
>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>> failed?
>>
>>
>>
>>
>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> The superclass method in DStream is defined as returning an
>>> Option[RDD[T]]
>>>
>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Getting compilation error while overriding compute method of
>>>> DirectKafkaInputDStream.
>>>>
>>>>
>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>> return type
>>>>
>>>> [ERROR] found   :
>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>
>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>
>>>>
>>>> class :
>>>>
>>>> public class CustomDirectKafkaInputDstream extends
>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>
>>>> @Override
>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>> compute(
>>>> Time validTime) {
>>>>
>>>> int processed=processedCounter.value();
>>>> int failed = failedProcessingsCounter.value();
>>>> if((processed==failed)){
>>>> System.out.println("backing off since its 100 % failure");
>>>> return Option.empty();
>>>> }else{
>>>> System.out.println("starting the stream ");
>>>>
>>>> return super.compute(validTime);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> What should be the return type of compute method ? super class is
>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>>  but its expecting
>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>> there something wring with code?
>>>>
>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Look at the definitions of the java-specific
>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>> JavaStreamingContext)
>>>>>
>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> How to create classtag in java ?Also Constructor
>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>> kafkautils.createDirectStream allows function.
>>>>>>
>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>
>>>>>>
>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>
>>>>>> public CustomDirectKafkaInputDstream(
>>>>>> StreamingContext ssc_,
>>>>>> Map<String, String> kafkaParams,
>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>> messageHandler,
>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>> evidence$2,
>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>> }
>>>>>> @Override
>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>> Time validTime) {
>>>>>> int processe=processedCounter.value();
>>>>>> int failed = failedProcessingsCounter.value();
>>>>>> if((processed==failed)){
>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>> return Option.empty();
>>>>>> }else{
>>>>>> System.out.println("starting the stream ");
>>>>>>
>>>>>> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> To create this stream
>>>>>> I am using
>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>> String>>conforms());
>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>> scalaktopicOffsetMap=
>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>> Long>>conforms());
>>>>>>
>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler
>>>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>         ..});
>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>> ,scalaktopicOffsetMap,
>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>
>>>>>>
>>>>>>
>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>> certain
>>>>>>> conditions.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Cody
>>>>>>>>
>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>> any message in next few runs?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> ---------- Forwarded message ----------
>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>> anything)
>>>>>>>> To: user <user@spark.apache.org>
>>>>>>>>
>>>>>>>>
>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>
>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>> external server and if external server is down I want to increase the 
>>>>>>>> batch
>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>> messages
>>>>>>>> in say next 5 successive runs ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to