The solution you found is also in the docs:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Java uses an atomic reference because Java doesn't allow you to close over
non-final references.

I'm not clear on your other question.

On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote:

> The solution how to share offsetRanges after DirectKafkaInputStream is
> transformed is in:
>
> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>
> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>
> One thing I would like to understand is why Scala version is using normal
> variable while Java version uses AtomicReference.
>
> Another thing which I don't get is about closure serialization. The
> question why logger in the below code doesn't throw NPE even its instance
> isn't copied like in the case of offsetRanges, when val offsets =
> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
> on offsets(idx). I have something like this code:
>
> object StreamOps {
>
>   val logger = LoggerFactory.getLogger("StreamOps")
>   var offsetRanges = Array[OffsetRange]()
>
> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>   stream transform { rdd =>
>     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>     rdd flatmap { message =>
>       Try(... decode Array[Byte] to F...) match {
>         case Success(fact) => Some(fact)
>         case _ => None
>     }
>   }
> }
>
> // Error handling removed for brevity
> def save[F](stream: DStream[F]): Unit {
>   stream foreachRDD { rdd =>
>     // It has to be here otherwise NullPointerException
>     val offsets = offsetRanges
>
>     rdd mapartitionWithIndex { (idx, facts) =>
>       // Use offsets here
>       val writer = new MyWriter[F](offsets(idx), ...)
>
>       facts foreach { fact =>
>         writer.write(fact)
>       }
>
>       writer.close()
>
>       // Why logger works and doesn't throw NullPointerException?
>       logger.info(...)
>
>       Iterator.empty
>     } foreach {
>       (_: Nothing) =>
>     }
>   }
> }
>
> Many thanks for any advice, I'm sure its a noob question.
> Petr
>
> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> Or can I generally create new RDD from transformation and enrich its
>> partitions with some metadata so that I would copy OffsetRanges in my new
>> RDD in DStream?
>>
>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>>
>>> Hi all,
>>> I need to transform KafkaRDD into a new stream of deserialized case
>>> classes. I want to use the new stream to save it to file and to perform
>>> additional transformations on it.
>>>
>>> To save it I want to use offsets in filenames, hence I need OffsetRanges
>>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do
>>> it.
>>>
>>> Alternatively I could deserialize directly in messageHandler before
>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>>> messages (KafkaRDD => RDD it would be flatMap).
>>>
>>> Is there a way how to do it using messageHandler, is there another
>>> approach?
>>>
>>> Many thanks for any help.
>>> Petr
>>>
>>
>>
>

Reply via email to