I think you could not use offsetRange in such way, when you transform a
DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
changed into normal RDD, but offsetRange is a specific attribute for
KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
meet such exception.

you could only do something like:

directKafkaInputDStream.foreachRDD { rdd =>
   rdd.asInstanceOf[HasOffsetRanges]....
  ...
}

Apply foreachRDD directly on DirectKafkaInputDStream.







2015-06-12 16:10 GMT+08:00 ZIGEN <dbviewer.zi...@gmail.com>:

> Hi, I'm using Spark Streaming(1.3.1).
> I want to get exactly-once messaging from Kafka and use Window operations
> of
> DStraem,
>
> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
> Direct-API
> java.lang.ClassCastException occurs as follows.
>
> --- stacktrace --
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>         at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
>         at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
>         at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>         at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at scala.util.Try$.apply(Try.scala:161)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
>
>
> --- my source ---
>
> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
> jssc.checkpoint("checkpoint");
>
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream
>  (jssc, String.class, String.class, StringDecoder.class,
> StringDecoder.class, kafkaParams, topicsSet);
>
> JavaPairDStream<String, List&lt;String>> pairDS = messages.mapToPair(...);
>
> JavaPairDStream<String, List&lt;String>> windowDs =
> pairDS.reduceByKeyAndWindow(new Function2<List&lt;String>, List<String>,
> List<String>>() {
>         @Override
>         public List<String> call(List<String> list1, List<String> list2)
> throws
> Exception {
>                 ...
>         }
> }, windowDuration, slideDuration);
>
> windowDs.foreachRDD(new Function<JavaPairRDD&lt;String,List&lt;String>>,
> Void>() {
>
>         @Override
>         public Void call(JavaPairRDD<String, List&lt;String>> rdd) throws
> Exception
> {
>
>
>                 OffsetRange[] offsetsList = ((HasOffsetRanges)
> rdd.rdd()).offsetRanges();
> // ClassCastException occurred
>
>                 KafkaCluster kc = new
> KafkaCluster(toScalaMap(kafkaParams));
>                 for (OffsetRange offsets : offsetsList) {
>
>                         TopicAndPartition topicAndPartition = new
> TopicAndPartition(offsets.topic(), offsets.partition());
>
>                         HashMap<TopicAndPartition, Object> map = new
> HashMap<TopicAndPartition,
> Object>();
>                         map.put(topicAndPartition, offsets.untilOffset());
>                         kc.setConsumerOffsets("group1", toScalaMap(map));
>                 }
>
>                 return null;
>         }
> });
>
> Thanks!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to