The idea here is that the custom partitioner shouldn't actually get used
for repartitioning the kafka stream (because that would involve a shuffle,
which is what you're trying to avoid).  You're just assigning a partitioner
because you know how it already is partitioned.


On Wed, Jan 13, 2016 at 11:22 AM, Dave <dave.davo...@gmail.com> wrote:

> So for case 1 below
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
> When the data is read from Kafka it will be partitioned correctly with the
> Custom Partitioner passed in to the new direct stream and kafka RDD
> implementations.
>
> For case 2
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like ....
> Am I correct in saying that the data from Kafka will not be read into
> memory in the cluster (kafka server is not located on the Spark Cluster in
> my use case) until the following code is executed
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
> In which case it will run through the partitioner of the wrapped RDD when
> it arrives in the cluster for the first time i.e. no shuffle.
>
> Thanks,
> Dave.
>
>
>
> On 13/01/16 17:00, Cody Koeninger wrote:
>
> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
> it's not cached by default.  If you're running kafka on the same nodes as
> spark, then data locality would play a factor, but that should be handled
> by the existing getPreferredLocations method.
>
> On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote:
>
>> Thanks Cody, appreciate the response.
>>
>> With this pattern the partitioners will now match when the join is
>> executed.
>> However, does the wrapper RDD not need to set the partition meta data on
>> the wrapped RDD in order to allow Spark to know where the data for each
>> partition resides in the cluster.
>>
>> Thanks,
>> Dave.
>>
>>
>> On 13/01/16 16:21, Cody Koeninger wrote:
>>
>> If two rdds have an identical partitioner, joining should not involve a
>> shuffle.
>>
>> You should be able to override the partitioner without calling
>> partitionBy.
>>
>> Two ways I can think of to do this:
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>>
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like
>>
>> val cp = YourCustomPartitioner(...)
>> val reference = YourReferenceRDD(cp, ...)
>> val stream = KafkaUtils....
>>
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>>
>>
>> I haven't had reason to do either one of those approaches, so YMMV, but I
>> believe others have
>>
>>
>>
>>
>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < <dave.davo...@gmail.com>
>> dave.davo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have the following use case:
>>>
>>> 1. Reference data stored in an RDD that is persisted and partitioned
>>> using a
>>> simple custom partitioner.
>>> 2. Input stream from kafka that uses the same partitioner algorithm as
>>> the
>>> ref data RDD - this partitioning is done in kafka.
>>>
>>> I am using kafka direct streams so the number of kafka partitions map to
>>> the
>>> number of partitions in the spark RDD. From testing and the
>>> documentation I
>>> see Spark does not know anything about how the data has been partitioned
>>> in
>>> kafka.
>>>
>>> In my use case I need to join the reference data RDD and the input stream
>>> RDD.  Due to the fact I have manually ensured the incoming data from
>>> kafka
>>> uses the same partitioning algorithm I know the data has been grouped
>>> correctly in the input stream RDD in Spark but I cannot do a join
>>> without a
>>> shuffle step due to the fact Spark has no knowledge of how the data has
>>> been
>>> partitioned.
>>>
>>> I have two ways to do this.
>>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>>> followed by a join. This results in a shuffle of the input stream RDD and
>>> then the co-partitioned join to take place.
>>> 2. Call join on the reference data RDD passing in the input stream RDD.
>>> Spark will do a shuffle under the hood in this case and the join will
>>> take
>>> place. The join will do its best to run on a node that has local access
>>> to
>>> the reference data RDD.
>>>
>>> Is there any difference between the 2 methods above or will both cause
>>> the
>>> same sequence of events to take place in Spark?
>>> Is all I have stated above correct?
>>>
>>> Finally, is there any road map feature for looking at allowing the user
>>> to
>>> push a partitioner into an already created RDD and not to do a shuffle.
>>> Spark in this case trusts that the data is setup correctly (as in the use
>>> case above) and simply fills in the necessary meta data on the RDD
>>> partitions i.e. check the first entry in each partition to determine the
>>> partition number of the data.
>>>
>>> Thank you in advance for any help on this issue.
>>> Dave.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: <user-unsubscr...@spark.apache.org>
>>> user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: <user-h...@spark.apache.org>
>>> user-h...@spark.apache.org
>>>
>>>
>>
>>
>
>

Reply via email to