The solution how to share offsetRanges after DirectKafkaInputStream is transformed is in: https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
One thing I would like to understand is why Scala version is using normal variable while Java version uses AtomicReference. Another thing which I don't get is about closure serialization. The question why logger in the below code doesn't throw NPE even its instance isn't copied like in the case of offsetRanges, when val offsets = offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws on offsets(idx). I have something like this code: object StreamOps { val logger = LoggerFactory.getLogger("StreamOps") var offsetRanges = Array[OffsetRange]() def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { stream transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatmap { message => Try(... decode Array[Byte] to F...) match { case Success(fact) => Some(fact) case _ => None } } } // Error handling removed for brevity def save[F](stream: DStream[F]): Unit { stream foreachRDD { rdd => // It has to be here otherwise NullPointerException val offsets = offsetRanges rdd mapartitionWithIndex { (idx, facts) => // Use offsets here val writer = new MyWriter[F](offsets(idx), ...) facts foreach { fact => writer.write(fact) } writer.close() // Why logger works and doesn't throw NullPointerException? logger.info(...) Iterator.empty } foreach { (_: Nothing) => } } } Many thanks for any advice, I'm sure its a noob question. Petr On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote: > Or can I generally create new RDD from transformation and enrich its > partitions with some metadata so that I would copy OffsetRanges in my new > RDD in DStream? > > On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> wrote: > >> Hi all, >> I need to transform KafkaRDD into a new stream of deserialized case >> classes. I want to use the new stream to save it to file and to perform >> additional transformations on it. >> >> To save it I want to use offsets in filenames, hence I need OffsetRanges >> in transformed RDD. But KafkaRDD is private, hence I don't know how to do >> it. >> >> Alternatively I could deserialize directly in messageHandler before >> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >> messages (KafkaRDD => RDD it would be flatMap). >> >> Is there a way how to do it using messageHandler, is there another >> approach? >> >> Many thanks for any help. >> Petr >> > >