So for case 1 below
- subclass or modify the direct stream and kafkardd. They're private, so you'd need to rebuild just the external kafka project, not all of spark When the data is read from Kafka it will be partitioned correctly with the Custom Partitioner passed in to the new direct stream and kafka RDD implementations.

For case 2
- write a wrapper subclass of rdd that takes a given custom partitioner and rdd in the constructor, overrides partitioner, and delegates every other method to the wrapped rdd. This should be possible without modification to any existing spark code. You'd use it something like .... Am I correct in saying that the data from Kafka will not be read into memory in the cluster (kafka server is not located on the Spark Cluster in my use case) until the following code is executed
stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}
In which case it will run through the partitioner of the wrapped RDD when it arrives in the cluster for the first time i.e. no shuffle.

Thanks,
Dave.



On 13/01/16 17:00, Cody Koeninger wrote:
In the case here of a kafkaRDD, the data doesn't reside on the cluster, it's not cached by default. If you're running kafka on the same nodes as spark, then data locality would play a factor, but that should be handled by the existing getPreferredLocations method.

On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com <mailto:dave.davo...@gmail.com>> wrote:

    Thanks Cody, appreciate the response.

    With this pattern the partitioners will now match when the join is
    executed.
    However, does the wrapper RDD not need to set the partition meta
    data on the wrapped RDD in order to allow Spark to know where the
    data for each partition resides in the cluster.

    Thanks,
    Dave.


    On 13/01/16 16:21, Cody Koeninger wrote:
    If two rdds have an identical partitioner, joining should not
    involve a shuffle.

    You should be able to override the partitioner without calling
    partitionBy.

    Two ways I can think of to do this:
    - subclass or modify the direct stream and kafkardd.  They're
    private, so you'd need to rebuild just the external kafka
    project, not all of spark

    - write a wrapper subclass of rdd that takes a given custom
    partitioner and rdd in the constructor, overrides partitioner,
    and delegates every other method to the wrapped rdd.  This should
be possible without modification to any existing spark code. You'd use it something like

    val cp = YourCustomPartitioner(...)
    val reference = YourReferenceRDD(cp, ...)
    val stream = KafkaUtils....

    stream.transform { rdd =>
      val wrapped = YourWrapper(cp, rdd)
      wrapped.join(reference)
    }


    I haven't had reason to do either one of those approaches, so
    YMMV, but I believe others have




    On Wed, Jan 13, 2016 at 3:40 AM, ddav <dave.davo...@gmail.com
    <mailto:dave.davo...@gmail.com>> wrote:

        Hi,

        I have the following use case:

        1. Reference data stored in an RDD that is persisted and
        partitioned using a
        simple custom partitioner.
        2. Input stream from kafka that uses the same partitioner
        algorithm as the
        ref data RDD - this partitioning is done in kafka.

        I am using kafka direct streams so the number of kafka
        partitions map to the
        number of partitions in the spark RDD. From testing and the
        documentation I
        see Spark does not know anything about how the data has been
        partitioned in
        kafka.

        In my use case I need to join the reference data RDD and the
        input stream
        RDD.  Due to the fact I have manually ensured the incoming
        data from kafka
        uses the same partitioning algorithm I know the data has been
        grouped
        correctly in the input stream RDD in Spark but I cannot do a
        join without a
        shuffle step due to the fact Spark has no knowledge of how
        the data has been
        partitioned.

        I have two ways to do this.
        1. Explicitly call PartitionBy(CutomParitioner) on the input
        stream RDD
        followed by a join. This results in a shuffle of the input
        stream RDD and
        then the co-partitioned join to take place.
        2. Call join on the reference data RDD passing in the input
        stream RDD.
        Spark will do a shuffle under the hood in this case and the
        join will take
        place. The join will do its best to run on a node that has
        local access to
        the reference data RDD.

        Is there any difference between the 2 methods above or will
        both cause the
        same sequence of events to take place in Spark?
        Is all I have stated above correct?

        Finally, is there any road map feature for looking at
        allowing the user to
        push a partitioner into an already created RDD and not to do
        a shuffle.
        Spark in this case trusts that the data is setup correctly
        (as in the use
        case above) and simply fills in the necessary meta data on
        the RDD
        partitions i.e. check the first entry in each partition to
        determine the
        partition number of the data.

        Thank you in advance for any help on this issue.
        Dave.



        --
        View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
        Sent from the Apache Spark User List mailing list archive at
        Nabble.com.

        ---------------------------------------------------------------------
        To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
        For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>





Reply via email to