You may know that those streams share the same keys, but Spark doesn't unless you tell it.
mapWithState takes a StateSpec, which should allow you to specify a partitioner. On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com> wrote: > Thanks for response, > > So as I understand there is no way to "tell" mapWithState leave the > partitioning schema as any other transformation would normally do. > Then I would like to clarify if there is a simple way to do a transformation > to a key-value stream and specify somehow the Partitioner that effectively > would result in the same partitioning schema as the original stream. > I.e.: > > stream.mapPartitions({ crs => > crs.map { cr => > cr.key() -> cr.value() > } > }) <--- specify somehow Partitioner here for the resulting rdd. > > > The reason I ask is that it simply looks strange to me that Spark will have > to shuffle each time my input stream and "state" stream during the > mapWithState operation when I now for sure that those two streams will > always share same keys and will not need access to others partitions. > > Thanks, > Andrii > > > 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> If you call a transformation on an rdd using the same partitioner as that >> rdd, no shuffle will occur. KafkaRDD doesn't have a partitioner, there's no >> consistent partitioning scheme that works for all kafka uses. You can wrap >> each kafkardd with an rdd that has a custom partitioner that you write to >> match your kafka partitioning scheme, and avoid a shuffle. >> >> The danger there is if you have any misbehaving producers, or translate >> the partitioning wrongly, you'll get bad results. It's safer just to >> shuffle. >> >> >> On Oct 31, 2016 04:31, "Andrii Biletskyi" >> <andrii.bilets...@yahoo.com.invalid> wrote: >> >> Hi all, >> >> I'm using Spark Streaming mapWithState operation to do a stateful >> operation on my Kafka stream (though I think similar arguments would apply >> for any source). >> >> Trying to understand a way to control mapWithState's partitioning schema. >> >> My transformations are simple: >> >> 1) create KafkaDStream >> 2) mapPartitions to get a key-value stream where `key` corresponds to >> Kafka message key >> 3) apply mapWithState operation on key-value stream, the state stream >> shares keys with the original stream, the resulting streams doesn't change >> keys either >> >> The problem is that, as I understand, mapWithState stream has a different >> partitioning schema and thus I see shuffles in Spark Web UI. >> >> From the mapWithState implementation I see that: >> mapwithState uses Partitioner if specified, otherwise partitions data with >> HashPartitioner(<default-parallelism-conf>). The thing is that original >> KafkaDStream has a specific partitioning schema: Kafka partitions correspond >> Spark RDD partitions. >> >> Question: is there a way for mapWithState stream to inherit partitioning >> schema from the original stream (i.e. correspond to Kafka partitions). >> >> Thanks, >> Andrii >> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org