Thanks Cody, appreciate the response.

With this pattern the partitioners will now match when the join is executed. However, does the wrapper RDD not need to set the partition meta data on the wrapped RDD in order to allow Spark to know where the data for each partition resides in the cluster.

Thanks,
Dave.

On 13/01/16 16:21, Cody Koeninger wrote:
If two rdds have an identical partitioner, joining should not involve a shuffle.

You should be able to override the partitioner without calling partitionBy.

Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd. They're private, so you'd need to rebuild just the external kafka project, not all of spark

- write a wrapper subclass of rdd that takes a given custom partitioner and rdd in the constructor, overrides partitioner, and delegates every other method to the wrapped rdd. This should be possible without modification to any existing spark code. You'd use it something like

val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils....

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so YMMV, but I believe others have




On Wed, Jan 13, 2016 at 3:40 AM, ddav <dave.davo...@gmail.com <mailto:dave.davo...@gmail.com>> wrote:

    Hi,

    I have the following use case:

    1. Reference data stored in an RDD that is persisted and
    partitioned using a
    simple custom partitioner.
    2. Input stream from kafka that uses the same partitioner
    algorithm as the
    ref data RDD - this partitioning is done in kafka.

    I am using kafka direct streams so the number of kafka partitions
    map to the
    number of partitions in the spark RDD. From testing and the
    documentation I
    see Spark does not know anything about how the data has been
    partitioned in
    kafka.

    In my use case I need to join the reference data RDD and the input
    stream
    RDD.  Due to the fact I have manually ensured the incoming data
    from kafka
    uses the same partitioning algorithm I know the data has been grouped
    correctly in the input stream RDD in Spark but I cannot do a join
    without a
    shuffle step due to the fact Spark has no knowledge of how the
    data has been
    partitioned.

    I have two ways to do this.
    1. Explicitly call PartitionBy(CutomParitioner) on the input
    stream RDD
    followed by a join. This results in a shuffle of the input stream
    RDD and
    then the co-partitioned join to take place.
    2. Call join on the reference data RDD passing in the input stream
    RDD.
    Spark will do a shuffle under the hood in this case and the join
    will take
    place. The join will do its best to run on a node that has local
    access to
    the reference data RDD.

    Is there any difference between the 2 methods above or will both
    cause the
    same sequence of events to take place in Spark?
    Is all I have stated above correct?

    Finally, is there any road map feature for looking at allowing the
    user to
    push a partitioner into an already created RDD and not to do a
    shuffle.
    Spark in this case trusts that the data is setup correctly (as in
    the use
    case above) and simply fills in the necessary meta data on the RDD
    partitions i.e. check the first entry in each partition to
    determine the
    partition number of the data.

    Thank you in advance for any help on this issue.
    Dave.



    --
    View this message in context:
    
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
    Sent from the Apache Spark User List mailing list archive at
    Nabble.com.

    ---------------------------------------------------------------------
    To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>
    For additional commands, e-mail: user-h...@spark.apache.org
    <mailto:user-h...@spark.apache.org>



Reply via email to