The solution how to share offsetRanges after DirectKafkaInputStream is
transformed is in:
https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

One thing I would like to understand is why Scala version is using normal
variable while Java version uses AtomicReference.

Another thing which I don't get is about closure serialization. The
question why logger in the below code doesn't throw NPE even its instance
isn't copied like in the case of offsetRanges, when val offsets =
offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
on offsets(idx). I have something like this code:

object StreamOps {

  val logger = LoggerFactory.getLogger("StreamOps")
  var offsetRanges = Array[OffsetRange]()

def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
  stream transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd flatmap { message =>
      Try(... decode Array[Byte] to F...) match {
        case Success(fact) => Some(fact)
        case _ => None
    }
  }
}

// Error handling removed for brevity
def save[F](stream: DStream[F]): Unit {
  stream foreachRDD { rdd =>
    // It has to be here otherwise NullPointerException
    val offsets = offsetRanges

    rdd mapartitionWithIndex { (idx, facts) =>
      // Use offsets here
      val writer = new MyWriter[F](offsets(idx), ...)

      facts foreach { fact =>
        writer.write(fact)
      }

      writer.close()

      // Why logger works and doesn't throw NullPointerException?
      logger.info(...)

      Iterator.empty
    } foreach {
      (_: Nothing) =>
    }
  }
}

Many thanks for any advice, I'm sure its a noob question.
Petr

On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote:

> Or can I generally create new RDD from transformation and enrich its
> partitions with some metadata so that I would copy OffsetRanges in my new
> RDD in DStream?
>
> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> Hi all,
>> I need to transform KafkaRDD into a new stream of deserialized case
>> classes. I want to use the new stream to save it to file and to perform
>> additional transformations on it.
>>
>> To save it I want to use offsets in filenames, hence I need OffsetRanges
>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do
>> it.
>>
>> Alternatively I could deserialize directly in messageHandler before
>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>> messages (KafkaRDD => RDD it would be flatMap).
>>
>> Is there a way how to do it using messageHandler, is there another
>> approach?
>>
>> Many thanks for any help.
>> Petr
>>
>
>

Reply via email to