In the case here of a kafkaRDD, the data doesn't reside on the cluster, it's not cached by default. If you're running kafka on the same nodes as spark, then data locality would play a factor, but that should be handled by the existing getPreferredLocations method.
On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote: > Thanks Cody, appreciate the response. > > With this pattern the partitioners will now match when the join is > executed. > However, does the wrapper RDD not need to set the partition meta data on > the wrapped RDD in order to allow Spark to know where the data for each > partition resides in the cluster. > > Thanks, > Dave. > > > On 13/01/16 16:21, Cody Koeninger wrote: > > If two rdds have an identical partitioner, joining should not involve a > shuffle. > > You should be able to override the partitioner without calling partitionBy. > > Two ways I can think of to do this: > - subclass or modify the direct stream and kafkardd. They're private, so > you'd need to rebuild just the external kafka project, not all of spark > > - write a wrapper subclass of rdd that takes a given custom partitioner > and rdd in the constructor, overrides partitioner, and delegates every > other method to the wrapped rdd. This should be possible without > modification to any existing spark code. You'd use it something like > > val cp = YourCustomPartitioner(...) > val reference = YourReferenceRDD(cp, ...) > val stream = KafkaUtils.... > > stream.transform { rdd => > val wrapped = YourWrapper(cp, rdd) > wrapped.join(reference) > } > > > I haven't had reason to do either one of those approaches, so YMMV, but I > believe others have > > > > > On Wed, Jan 13, 2016 at 3:40 AM, ddav <dave.davo...@gmail.com> wrote: > >> Hi, >> >> I have the following use case: >> >> 1. Reference data stored in an RDD that is persisted and partitioned >> using a >> simple custom partitioner. >> 2. Input stream from kafka that uses the same partitioner algorithm as the >> ref data RDD - this partitioning is done in kafka. >> >> I am using kafka direct streams so the number of kafka partitions map to >> the >> number of partitions in the spark RDD. From testing and the documentation >> I >> see Spark does not know anything about how the data has been partitioned >> in >> kafka. >> >> In my use case I need to join the reference data RDD and the input stream >> RDD. Due to the fact I have manually ensured the incoming data from kafka >> uses the same partitioning algorithm I know the data has been grouped >> correctly in the input stream RDD in Spark but I cannot do a join without >> a >> shuffle step due to the fact Spark has no knowledge of how the data has >> been >> partitioned. >> >> I have two ways to do this. >> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD >> followed by a join. This results in a shuffle of the input stream RDD and >> then the co-partitioned join to take place. >> 2. Call join on the reference data RDD passing in the input stream RDD. >> Spark will do a shuffle under the hood in this case and the join will take >> place. The join will do its best to run on a node that has local access to >> the reference data RDD. >> >> Is there any difference between the 2 methods above or will both cause the >> same sequence of events to take place in Spark? >> Is all I have stated above correct? >> >> Finally, is there any road map feature for looking at allowing the user to >> push a partitioner into an already created RDD and not to do a shuffle. >> Spark in this case trusts that the data is setup correctly (as in the use >> case above) and simply fills in the necessary meta data on the RDD >> partitions i.e. check the first entry in each partition to determine the >> partition number of the data. >> >> Thank you in advance for any help on this issue. >> Dave. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > >