How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function.
I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]>{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, Map<String, String> kafkaParams, Map<TopicAndPartition, Object> fromOffsets, Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler, ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, ClassTag<DefaultDecoder> evidence$3, ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, byte[][]>> compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println("backing off since its 100 % failure"); return Option.empty(); }else{ System.out.println("starting the stream "); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.Map<String, String> scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, String>>conforms()); scala.collection.immutable.Map<TopicAndPartition, Long> scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, Long>>conforms()); scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { ..}); JavaDStream<byte[][]> directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class,byte[][].class); How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And how to use Function instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> wrote: > I'm not aware of an existing api per se, but you could create your own > subclass of the DStream that returns None for compute() under certain > conditions. > > > > On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> Hi Cody >> >> Can you help here if streaming 1.3 has any api for not consuming any >> message in next few runs? >> >> Thanks >> >> ---------- Forwarded message ---------- >> From: Shushant Arora <shushantaror...@gmail.com> >> Date: Wed, Aug 12, 2015 at 11:23 PM >> Subject: spark streaming 1.3 doubts(force it to not consume anything) >> To: user <user@spark.apache.org> >> >> >> I Can't make my stream application batch interval to change at run time . >> Its always fixed and it always creates jobs at specified batch inetval and >> enqueue them if earleir batch is not finished. >> >> My requirement is to process the events and post them to some external >> server and if external server is down I want to increase the batch time - >> that is not possible but can I make it not to consume any messages in say >> next 5 successive runs ? >> >> >> >> >