The idea here is that the custom partitioner shouldn't actually get used for repartitioning the kafka stream (because that would involve a shuffle, which is what you're trying to avoid). You're just assigning a partitioner because you know how it already is partitioned.
On Wed, Jan 13, 2016 at 11:22 AM, Dave <dave.davo...@gmail.com> wrote: > So for case 1 below > - subclass or modify the direct stream and kafkardd. They're private, so > you'd need to rebuild just the external kafka project, not all of spark > When the data is read from Kafka it will be partitioned correctly with the > Custom Partitioner passed in to the new direct stream and kafka RDD > implementations. > > For case 2 > - write a wrapper subclass of rdd that takes a given custom partitioner > and rdd in the constructor, overrides partitioner, and delegates every > other method to the wrapped rdd. This should be possible without > modification to any existing spark code. You'd use it something like .... > Am I correct in saying that the data from Kafka will not be read into > memory in the cluster (kafka server is not located on the Spark Cluster in > my use case) until the following code is executed > stream.transform { rdd => > val wrapped = YourWrapper(cp, rdd) > wrapped.join(reference) > } > In which case it will run through the partitioner of the wrapped RDD when > it arrives in the cluster for the first time i.e. no shuffle. > > Thanks, > Dave. > > > > On 13/01/16 17:00, Cody Koeninger wrote: > > In the case here of a kafkaRDD, the data doesn't reside on the cluster, > it's not cached by default. If you're running kafka on the same nodes as > spark, then data locality would play a factor, but that should be handled > by the existing getPreferredLocations method. > > On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote: > >> Thanks Cody, appreciate the response. >> >> With this pattern the partitioners will now match when the join is >> executed. >> However, does the wrapper RDD not need to set the partition meta data on >> the wrapped RDD in order to allow Spark to know where the data for each >> partition resides in the cluster. >> >> Thanks, >> Dave. >> >> >> On 13/01/16 16:21, Cody Koeninger wrote: >> >> If two rdds have an identical partitioner, joining should not involve a >> shuffle. >> >> You should be able to override the partitioner without calling >> partitionBy. >> >> Two ways I can think of to do this: >> - subclass or modify the direct stream and kafkardd. They're private, so >> you'd need to rebuild just the external kafka project, not all of spark >> >> - write a wrapper subclass of rdd that takes a given custom partitioner >> and rdd in the constructor, overrides partitioner, and delegates every >> other method to the wrapped rdd. This should be possible without >> modification to any existing spark code. You'd use it something like >> >> val cp = YourCustomPartitioner(...) >> val reference = YourReferenceRDD(cp, ...) >> val stream = KafkaUtils.... >> >> stream.transform { rdd => >> val wrapped = YourWrapper(cp, rdd) >> wrapped.join(reference) >> } >> >> >> I haven't had reason to do either one of those approaches, so YMMV, but I >> believe others have >> >> >> >> >> On Wed, Jan 13, 2016 at 3:40 AM, ddav < <dave.davo...@gmail.com> >> dave.davo...@gmail.com> wrote: >> >>> Hi, >>> >>> I have the following use case: >>> >>> 1. Reference data stored in an RDD that is persisted and partitioned >>> using a >>> simple custom partitioner. >>> 2. Input stream from kafka that uses the same partitioner algorithm as >>> the >>> ref data RDD - this partitioning is done in kafka. >>> >>> I am using kafka direct streams so the number of kafka partitions map to >>> the >>> number of partitions in the spark RDD. From testing and the >>> documentation I >>> see Spark does not know anything about how the data has been partitioned >>> in >>> kafka. >>> >>> In my use case I need to join the reference data RDD and the input stream >>> RDD. Due to the fact I have manually ensured the incoming data from >>> kafka >>> uses the same partitioning algorithm I know the data has been grouped >>> correctly in the input stream RDD in Spark but I cannot do a join >>> without a >>> shuffle step due to the fact Spark has no knowledge of how the data has >>> been >>> partitioned. >>> >>> I have two ways to do this. >>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD >>> followed by a join. This results in a shuffle of the input stream RDD and >>> then the co-partitioned join to take place. >>> 2. Call join on the reference data RDD passing in the input stream RDD. >>> Spark will do a shuffle under the hood in this case and the join will >>> take >>> place. The join will do its best to run on a node that has local access >>> to >>> the reference data RDD. >>> >>> Is there any difference between the 2 methods above or will both cause >>> the >>> same sequence of events to take place in Spark? >>> Is all I have stated above correct? >>> >>> Finally, is there any road map feature for looking at allowing the user >>> to >>> push a partitioner into an already created RDD and not to do a shuffle. >>> Spark in this case trusts that the data is setup correctly (as in the use >>> case above) and simply fills in the necessary meta data on the RDD >>> partitions i.e. check the first entry in each partition to determine the >>> partition number of the data. >>> >>> Thank you in advance for any help on this issue. >>> Dave. >>> >>> >>> >>> -- >>> View this message in context: >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: <user-unsubscr...@spark.apache.org> >>> user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: <user-h...@spark.apache.org> >>> user-h...@spark.apache.org >>> >>> >> >> > >