If the transformation you're trying to do really is per-partition, it
shouldn't matter whether you're using scala methods or spark methods.  The
parallel speedup you're getting is all from doing the work on multiple
machines, and shuffle or caching or other benefits of spark aren't a factor.

If using scala methods bothers you, do all of your transformation using
spark methods, collect the results back to the driver, and save them with
the offsets there:

stream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges
  val results = rdd.some.chain.of.spark.calls.collect
  save(offsets, results)
}

My work-in-progress slides for my talk at the upcoming spark conference are
here

http://koeninger.github.io/kafka-exactly-once/

if that clarifies that point a little bit (slides 20 vs 21)

The direct stream doesn't use long-running receivers, so the concerns that
blog post is trying to address don't really apply.

Under normal operation a given partition of an rdd is only going to be
handled by a single executor at a time (as long as you don't turn on
speculative execution... or I suppose it might be possible in some kind of
network partition situation).  Transactionality should save you even if
something weird happens though.

On Thu, May 14, 2015 at 3:44 PM, will-ob <will.obr...@tapjoy.com> wrote:

> Hey Cody (et. al.),
>
> Few more questions related to this. It sounds like our missing data issues
> appear fixed with this approach. Could you shed some light on a few
> questions that came up?
>
> ---------------------
>
> Processing our data inside a single foreachPartition function appears to be
> very different from the pattern seen in the programming guide. Does this
> become problematic with additional, interleaved reduce/filter/map steps?
>
> ```
> # typical?
> rdd
>   .map { ... }
>   .reduce { ... }
>   .filter { ... }
>   .reduce { ... }
>   .foreachRdd { writeToDb }
>
> # with foreachPartition
> rdd.foreachPartition { case (iter) =>
>   iter
>     .map { ... }
>     .reduce { ... }
>     .filter { ... }
>     .reduce { ... }
> }
>
> ```
> ---------------------------------
>
> Could the above be simplified by having
>
> one kafka partition per DStream, rather than
> one kafka partition per RDD partition
>
> ?
>
> That way, we wouldn't need to do our processing inside each partition as
> there would only be one set of kafka metadata to commit.
>
> Presumably, one could `join` DStreams when topic-level aggregates were
> needed.
>
> It seems this was the approach of Michael Noll in his blog post.
> (
> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
> )
> Although, his primary motivation appears to be maintaining high-throughput
> /
> parallelism rather than kafka metadata.
>
> ---------------------------------
>
> From the blog post:
>
> "... there is no long-running receiver task that occupies a core per stream
> regardless of what the message volume is."
>
> Is this because data is retrieved by polling rather than maintaining a
> socket? Is it still the case that there is only one receiver process per
> DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1
> .. else discover the machine's NIC limit?
>
> Can you think of a reason not to do this? Cluster utilization, or the like,
> perhaps?
>
> --------------------------------
>
> And seems a silly question, but does `foreachPartition` guarantee that a
> single worker will process the passed function? Or might two workers split
> the work?
>
> Eg. foreachPartition(f)
>
> Worker 1:     f( Iterator[partition 1 records 1 - 50] )
> Worker 2:     f( Iterator[partition 1 records 51 - 100] )
>
> It is unclear from the scaladocs
> (
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
> ).
> But you can imagine, if it is critical that this data be committed in a
> single transaction, that two workers will have issues.
>
>
>
> -- Will O
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to