The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Java uses an atomic reference because Java doesn't allow you to close over non-final references. I'm not clear on your other question. On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote: > The solution how to share offsetRanges after DirectKafkaInputStream is > transformed is in: > > https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala > > https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java > > One thing I would like to understand is why Scala version is using normal > variable while Java version uses AtomicReference. > > Another thing which I don't get is about closure serialization. The > question why logger in the below code doesn't throw NPE even its instance > isn't copied like in the case of offsetRanges, when val offsets = > offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws > on offsets(idx). I have something like this code: > > object StreamOps { > > val logger = LoggerFactory.getLogger("StreamOps") > var offsetRanges = Array[OffsetRange]() > > def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { > stream transform { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > > rdd flatmap { message => > Try(... decode Array[Byte] to F...) match { > case Success(fact) => Some(fact) > case _ => None > } > } > } > > // Error handling removed for brevity > def save[F](stream: DStream[F]): Unit { > stream foreachRDD { rdd => > // It has to be here otherwise NullPointerException > val offsets = offsetRanges > > rdd mapartitionWithIndex { (idx, facts) => > // Use offsets here > val writer = new MyWriter[F](offsets(idx), ...) > > facts foreach { fact => > writer.write(fact) > } > > writer.close() > > // Why logger works and doesn't throw NullPointerException? > logger.info(...) > > Iterator.empty > } foreach { > (_: Nothing) => > } > } > } > > Many thanks for any advice, I'm sure its a noob question. > Petr > > On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote: > >> Or can I generally create new RDD from transformation and enrich its >> partitions with some metadata so that I would copy OffsetRanges in my new >> RDD in DStream? >> >> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> wrote: >> >>> Hi all, >>> I need to transform KafkaRDD into a new stream of deserialized case >>> classes. I want to use the new stream to save it to file and to perform >>> additional transformations on it. >>> >>> To save it I want to use offsets in filenames, hence I need OffsetRanges >>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do >>> it. >>> >>> Alternatively I could deserialize directly in messageHandler before >>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >>> messages (KafkaRDD => RDD it would be flatMap). >>> >>> Is there a way how to do it using messageHandler, is there another >>> approach? >>> >>> Many thanks for any help. >>> Petr >>> >> >> >