How to create classtag in java ?Also Constructor of DirectKafkaInputDStream
takes Function1 not Function but kafkautils.createDirectStream allows
function.

I have below as overriden DirectKafkaInputDStream.


public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder, byte[][]>{

public CustomDirectKafkaInputDstream(
StreamingContext ssc_,
Map<String, String> kafkaParams,
Map<TopicAndPartition, Object> fromOffsets,
Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
ClassTag<DefaultDecoder> evidence$3,
ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
evidence$2,
evidence$3, evidence$4, evidence$5);
}
@Override
public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
byte[][]>> compute(
Time validTime) {
int processe=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}



To create this stream
I am using
scala.collection.immutable.Map<String, String> scalakafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
String>>conforms());
scala.collection.immutable.Map<TopicAndPartition, Long>
scalaktopicOffsetMap=
JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
Long>>conforms());

scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = new
Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
        ..});
JavaDStream<byte[][]> directKafkaStream = new
CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
kafka.serializer.DefaultDecoder.class,byte[][].class);



How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
how to use Function instead of Function1 ?



On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> wrote:

> I'm not aware of an existing api per se, but you could create your own
> subclass of the DStream that returns None for compute() under certain
> conditions.
>
>
>
> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Hi Cody
>>
>> Can you help here if streaming 1.3 has any api for not consuming any
>> message in next few runs?
>>
>> Thanks
>>
>> ---------- Forwarded message ----------
>> From: Shushant Arora <shushantaror...@gmail.com>
>> Date: Wed, Aug 12, 2015 at 11:23 PM
>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>> To: user <user@spark.apache.org>
>>
>>
>> I Can't make my stream application batch interval to change at run time .
>> Its always fixed and it always creates jobs at specified batch inetval and
>> enqueue them if earleir batch is not finished.
>>
>> My requirement is to process the events and post them to some external
>> server and if external server is down I want to increase the batch time -
>> that is not possible but can I make it not to consume any messages in say
>> next 5 successive runs ?
>>
>>
>>
>>
>

Reply via email to