Re: MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Thanks,

As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.

I think I'll stick with that.

Thanks,
Andrii

2016-10-31 16:55 GMT+02:00 Cody Koeninger :

> You may know that those streams share the same keys, but Spark doesn't
> unless you tell it.
>
> mapWithState takes a StateSpec, which should allow you to specify a
> partitioner.
>
> On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi 
> wrote:
> > Thanks for response,
> >
> > So as I understand there is no way to "tell" mapWithState leave the
> > partitioning schema as any other transformation would normally do.
> > Then I would like to clarify if there is a simple way to do a
> transformation
> > to a key-value stream and specify somehow the Partitioner that
> effectively
> > would result in the same partitioning schema as the original stream.
> > I.e.:
> >
> > stream.mapPartitions({ crs =>
> >   crs.map { cr =>
> > cr.key() -> cr.value()
> >   }
> > }) <--- specify somehow Partitioner here for the resulting rdd.
> >
> >
> > The reason I ask is that it simply looks strange to me that Spark will
> have
> > to shuffle each time my input stream and "state" stream during the
> > mapWithState operation when I now for sure that those two streams will
> > always share same keys and will not need access to others partitions.
> >
> > Thanks,
> > Andrii
> >
> >
> > 2016-10-31 15:45 GMT+02:00 Cody Koeninger :
> >>
> >> If you call a transformation on an rdd using the same partitioner as
> that
> >> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner,
> there's no
> >> consistent partitioning scheme that works for all kafka uses. You can
> wrap
> >> each kafkardd with an rdd that has a custom partitioner that you write
> to
> >> match your kafka partitioning scheme, and avoid a shuffle.
> >>
> >> The danger there is if you have any misbehaving producers, or translate
> >> the partitioning wrongly, you'll get bad results. It's safer just to
> >> shuffle.
> >>
> >>
> >> On Oct 31, 2016 04:31, "Andrii Biletskyi"
> >>  wrote:
> >>
> >> Hi all,
> >>
> >> I'm using Spark Streaming mapWithState operation to do a stateful
> >> operation on my Kafka stream (though I think similar arguments would
> apply
> >> for any source).
> >>
> >> Trying to understand a way to control mapWithState's partitioning
> schema.
> >>
> >> My transformations are simple:
> >>
> >> 1) create KafkaDStream
> >> 2) mapPartitions to get a key-value stream where `key` corresponds to
> >> Kafka message key
> >> 3) apply mapWithState operation on key-value stream, the state stream
> >> shares keys with the original stream, the resulting streams doesn't
> change
> >> keys either
> >>
> >> The problem is that, as I understand, mapWithState stream has a
> different
> >> partitioning schema and thus I see shuffles in Spark Web UI.
> >>
> >> From the mapWithState implementation I see that:
> >> mapwithState uses Partitioner if specified, otherwise partitions data
> with
> >> HashPartitioner(). The thing is that original
> >> KafkaDStream has a specific partitioning schema: Kafka partitions
> correspond
> >> Spark RDD partitions.
> >>
> >> Question: is there a way for mapWithState stream to inherit partitioning
> >> schema from the original stream (i.e. correspond to Kafka partitions).
> >>
> >> Thanks,
> >> Andrii
> >>
> >>
> >
>


Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.

mapWithState takes a StateSpec, which should allow you to specify a partitioner.

On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi  wrote:
> Thanks for response,
>
> So as I understand there is no way to "tell" mapWithState leave the
> partitioning schema as any other transformation would normally do.
> Then I would like to clarify if there is a simple way to do a transformation
> to a key-value stream and specify somehow the Partitioner that effectively
> would result in the same partitioning schema as the original stream.
> I.e.:
>
> stream.mapPartitions({ crs =>
>   crs.map { cr =>
> cr.key() -> cr.value()
>   }
> }) <--- specify somehow Partitioner here for the resulting rdd.
>
>
> The reason I ask is that it simply looks strange to me that Spark will have
> to shuffle each time my input stream and "state" stream during the
> mapWithState operation when I now for sure that those two streams will
> always share same keys and will not need access to others partitions.
>
> Thanks,
> Andrii
>
>
> 2016-10-31 15:45 GMT+02:00 Cody Koeninger :
>>
>> If you call a transformation on an rdd using the same partitioner as that
>> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's no
>> consistent partitioning scheme that works for all kafka uses. You can wrap
>> each kafkardd with an rdd that has a custom partitioner that you write to
>> match your kafka partitioning scheme, and avoid a shuffle.
>>
>> The danger there is if you have any misbehaving producers, or translate
>> the partitioning wrongly, you'll get bad results. It's safer just to
>> shuffle.
>>
>>
>> On Oct 31, 2016 04:31, "Andrii Biletskyi"
>>  wrote:
>>
>> Hi all,
>>
>> I'm using Spark Streaming mapWithState operation to do a stateful
>> operation on my Kafka stream (though I think similar arguments would apply
>> for any source).
>>
>> Trying to understand a way to control mapWithState's partitioning schema.
>>
>> My transformations are simple:
>>
>> 1) create KafkaDStream
>> 2) mapPartitions to get a key-value stream where `key` corresponds to
>> Kafka message key
>> 3) apply mapWithState operation on key-value stream, the state stream
>> shares keys with the original stream, the resulting streams doesn't change
>> keys either
>>
>> The problem is that, as I understand, mapWithState stream has a different
>> partitioning schema and thus I see shuffles in Spark Web UI.
>>
>> From the mapWithState implementation I see that:
>> mapwithState uses Partitioner if specified, otherwise partitions data with
>> HashPartitioner(). The thing is that original
>> KafkaDStream has a specific partitioning schema: Kafka partitions correspond
>> Spark RDD partitions.
>>
>> Question: is there a way for mapWithState stream to inherit partitioning
>> schema from the original stream (i.e. correspond to Kafka partitions).
>>
>> Thanks,
>> Andrii
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
If you call a transformation on an rdd using the same partitioner as that
rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's
no consistent partitioning scheme that works for all kafka uses. You can
wrap each kafkardd with an rdd that has a custom partitioner that you write
to match your kafka partitioning scheme, and avoid a shuffle.

The danger there is if you have any misbehaving producers, or translate the
partitioning wrongly, you'll get bad results. It's safer just to shuffle.

On Oct 31, 2016 04:31, "Andrii Biletskyi"
 wrote:

Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii