Hi Cody,

thanks, rdd.rdd() did the trick. I now have the offsets via
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

But how can you commit the offset to Kafka?
Casting the JavaInputDStream throws an CCE:

CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream; // Throws CCE
cco.commitAsync(offsets);

java.lang.ClassCastException:
org.apache.spark.streaming.api.java.JavaInputDStream cannot be cast to
org.apache.spark.streaming.kafka010.CanCommitOffsets
at SparkTest.lambda$0(SparkTest.java:103)

Best regards,
Max


2016-10-10 20:18 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> This should give you hints on the necessary cast:
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-8-
> integration.html#tab_java_2
>
> The main ugly thing there is that the java rdd is wrapping the scala
> rdd, so you need to unwrap one layer via rdd.rdd()
>
> If anyone wants to work on a PR to update the java examples in the
> docs for the 0-10 version, I'm happy to help.
>
> On Mon, Oct 10, 2016 at 10:34 AM, static-max <flasha...@googlemail.com>
> wrote:
> > Hi,
> >
> > by following this article I managed to consume messages from Kafka 0.10
> in
> > Spark 2.0:
> > http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >
> > However, the Java examples are missing and I would like to commit the
> offset
> > myself after processing the RDD. Does anybody have a working example for
> me?
> > "offsetRanges" seems to be a trait and not available after casting the
> RDD
> > to "HasOffsetRanges"
> >
> > Thanks a lot!
> >
> > Scala example:
> >
> > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>

Reply via email to