In the case here of a kafkaRDD, the data doesn't reside on the cluster,
it's not cached by default.  If you're running kafka on the same nodes as
spark, then data locality would play a factor, but that should be handled
by the existing getPreferredLocations method.

On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote:

> Thanks Cody, appreciate the response.
>
> With this pattern the partitioners will now match when the join is
> executed.
> However, does the wrapper RDD not need to set the partition meta data on
> the wrapped RDD in order to allow Spark to know where the data for each
> partition resides in the cluster.
>
> Thanks,
> Dave.
>
>
> On 13/01/16 16:21, Cody Koeninger wrote:
>
> If two rdds have an identical partitioner, joining should not involve a
> shuffle.
>
> You should be able to override the partitioner without calling partitionBy.
>
> Two ways I can think of to do this:
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
>
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like
>
> val cp = YourCustomPartitioner(...)
> val reference = YourReferenceRDD(cp, ...)
> val stream = KafkaUtils....
>
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
>
>
> I haven't had reason to do either one of those approaches, so YMMV, but I
> believe others have
>
>
>
>
> On Wed, Jan 13, 2016 at 3:40 AM, ddav <dave.davo...@gmail.com> wrote:
>
>> Hi,
>>
>> I have the following use case:
>>
>> 1. Reference data stored in an RDD that is persisted and partitioned
>> using a
>> simple custom partitioner.
>> 2. Input stream from kafka that uses the same partitioner algorithm as the
>> ref data RDD - this partitioning is done in kafka.
>>
>> I am using kafka direct streams so the number of kafka partitions map to
>> the
>> number of partitions in the spark RDD. From testing and the documentation
>> I
>> see Spark does not know anything about how the data has been partitioned
>> in
>> kafka.
>>
>> In my use case I need to join the reference data RDD and the input stream
>> RDD.  Due to the fact I have manually ensured the incoming data from kafka
>> uses the same partitioning algorithm I know the data has been grouped
>> correctly in the input stream RDD in Spark but I cannot do a join without
>> a
>> shuffle step due to the fact Spark has no knowledge of how the data has
>> been
>> partitioned.
>>
>> I have two ways to do this.
>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>> followed by a join. This results in a shuffle of the input stream RDD and
>> then the co-partitioned join to take place.
>> 2. Call join on the reference data RDD passing in the input stream RDD.
>> Spark will do a shuffle under the hood in this case and the join will take
>> place. The join will do its best to run on a node that has local access to
>> the reference data RDD.
>>
>> Is there any difference between the 2 methods above or will both cause the
>> same sequence of events to take place in Spark?
>> Is all I have stated above correct?
>>
>> Finally, is there any road map feature for looking at allowing the user to
>> push a partitioner into an already created RDD and not to do a shuffle.
>> Spark in this case trusts that the data is setup correctly (as in the use
>> case above) and simply fills in the necessary meta data on the RDD
>> partitions i.e. check the first entry in each partition to determine the
>> partition number of the data.
>>
>> Thank you in advance for any help on this issue.
>> Dave.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>

Reply via email to