To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;

In scala C[T’] is a subclass of C[T] as per
https://twitter.github.io/scala_school/type-basics.html
but this is not allowed in java.

So is there any workaround to achieve this in java for overriding
DirectKafkaInputDStream
?


On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
> inheritance is not supported so derived class cannot return  different
> genric typed subclass from overriden method.
>
> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Option is covariant and KafkaRDD is a subclass of RDD
>>
>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is it that in scala its allowed for derived class to have any return
>>> type ?
>>>
>>>  And streaming jar is originally created in scala so its allowed for
>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>> compute method ?
>>>
>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> looking at source code of
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>
>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>>> = {
>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>>> messageHandler)
>>>>
>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>     Some(rdd)
>>>>   }
>>>>
>>>>
>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>>
>>>> So what should  be the return type of custom DStream extends
>>>> DirectKafkaInputDStream .
>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>>>> normal scenarios and return none in specific scenario.
>>>>
>>>> And why the same error did not come while extending
>>>> DirectKafkaInputDStream from InputDStream ? Since new return type 
>>>> Option[KafkaRDD[K,
>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>> failed?
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The superclass method in DStream is defined as returning an
>>>>> Option[RDD[T]]
>>>>>
>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> Getting compilation error while overriding compute method of
>>>>>> DirectKafkaInputDStream.
>>>>>>
>>>>>>
>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use 
>>>>>> incompatible
>>>>>> return type
>>>>>>
>>>>>> [ERROR] found   :
>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>
>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>
>>>>>>
>>>>>> class :
>>>>>>
>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>
>>>>>> @Override
>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>> Time validTime) {
>>>>>>
>>>>>> int processed=processedCounter.value();
>>>>>> int failed = failedProcessingsCounter.value();
>>>>>> if((processed==failed)){
>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>> return Option.empty();
>>>>>> }else{
>>>>>> System.out.println("starting the stream ");
>>>>>>
>>>>>> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> What should be the return type of compute method ? super class is
>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>>> byte[][]>>  but its expecting
>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . 
>>>>>> Is
>>>>>> there something wring with code?
>>>>>>
>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Look at the definitions of the java-specific
>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>> JavaStreamingContext)
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>
>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>
>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream<byte[], byte[], 
>>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>> StreamingContext ssc_,
>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>> messageHandler,
>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5)
>>>>>>>> {
>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>> evidence$2,
>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>> }
>>>>>>>> @Override
>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>> int processe=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> To create this stream
>>>>>>>> I am using
>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>> String>>conforms());
>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>>> scalaktopicOffsetMap=
>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>> Long>>conforms());
>>>>>>>>
>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() 
>>>>>>>> {
>>>>>>>>         ..});
>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>>> ,scalaktopicOffsetMap,
>>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> How to pass classTag to constructor in
>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>> Function1 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>>> own subclass of the DStream that returns None for compute() under 
>>>>>>>>> certain
>>>>>>>>> conditions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody
>>>>>>>>>>
>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>> any message in next few runs?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>> anything)
>>>>>>>>>> To: user <user@spark.apache.org>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified 
>>>>>>>>>> batch
>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>
>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>> external server and if external server is down I want to increase 
>>>>>>>>>> the batch
>>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>>> messages
>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to