Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-05 Thread Jungtaek Lim
Answering one of missed question:

>  I am not sure how were you planning to expose the state key groups at
api level and if it would be transparent.

I was thinking about introducing new configuration: it may look like adding
unnecessary configuration, but I thought it would help elasticity
("adaptive execution" might be relevant in spark world) eventually. Being
put as a configuration would be easier for Spark to modify dynamically: If
we let user to call method in DSL, Spark should treat it as intentional and
try best to respect the user input.

When I thought about above I missed the fact that parallelism of state
operator would be applied to the parallelism of data writers unless we
input another stage. So it may not be ideal to rearrange such thing in
runtime and better to be more concerned about parallelism of data writers,
but still worth as food for thought.

2018년 8월 5일 (일) 오후 7:28, Jungtaek Lim 님이 작성:

> "coalesce" looks like working: I misunderstood it as an efficient version
> of "repartition" which does shuffle, so expected it would trigger shuffle.
> My proposal would be covered as using "coalesce": thanks Joseph for
> correction. Let me abandon the proposal.
>
> We may still miss for now is documentation for the fact: the number of
> partitions for states cannot be changed, so Spark restricts to modify
> "spark.sql.shuffle.partitions" once the query is run (only applying to
> streaming query, right?). If end users want to have more or less number of
> state partitions, the value should be set before running the query at the
> first time. Would it be better to add this to "Structured Streaming" doc?
>
> I agree that decoupling state and partitions would not be simple. I'd try
> out (offline) repartition first if the number of partitions would be really
> matter for scalability / elasticity.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이
> 작성:
>
>> I'd agree it might make sense to bundle this into an API. We'd have to
>> think about whether it's a common enough use case to justify the API
>> complexity.
>>
>> It might be worth exploring decoupling state and partitions, but I
>> wouldn't want to start making decisions based on it without a clearer
>> design picture. I would expect the decoupling to make it very difficult to
>> ensure idempotency of state updates.
>>
>> Jose
>>
>> On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:
>>
>>> coalesce might work.
>>>
>>> Say "spark.sql.shuffle.partitions" = 200, and then "
>>> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
>>> create 200 instances for state but execute just 2 tasks.
>>>
>>> However I think further groupByKey operations downstream would need
>>> similar coalesce.
>>>
>>> And this is assuming the user sets the right shuffle partitions upfront.
>>>
>>> It maybe worth to bundle this pattern as some builtin api so that it can
>>> be transparent to the user. I am not sure how were you planning to expose
>>> the state key groups at api level and if it would be transparent.
>>>
>>> IMO, decoupling the state and partitions and making it key based would
>>> still be worth exploring to support dynamic state rebalancing. May be the
>>> default HDFS based implementation can maintain the state partition wise and
>>> not support it, but there could be implementations based on distributed k-v
>>> store which supports this.
>>>
>>> Thanks,
>>> Arun
>>>
>>>
>>> On 3 August 2018 at 08:21, Joseph Torres 
>>> wrote:
>>>
 A coalesced RDD will definitely maintain any within-partition
 invariants that the original RDD maintained. It pretty much just runs its
 input partitions sequentially.

 There'd still be some Dataframe API work needed to get the coalesce
 operation where you want it to be, but this is much simpler than
 introducing a new concept of state key groups. As far as I can tell,
 state key groups are just the same thing that we currently call partitions
 of the aggregate RDD.

 On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:

> I’m afraid I don’t know about the details on coalesce(), but some
> finding resource for coalesce, it looks like helping reducing actual
> partitions.
>
> For streaming aggregation, state for all partitions (by default, 200)
> must be initialized and committed even it is being unchanged. Otherwise
> error occurred when reading a partition which is excluded in query
> previously. Moreover, it can’t find existing row from state or store row 
> in
> wrong partition if partition id doesn’t match the expected id via hashing
> function.
>
> Could you verify coalesce() meets such requirements?
>
> On Fri, 3 Aug 2018 at 22:23 Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> Scheduling multiple partitions in the same task is basically what
>> coalesce() does. Is there a reason that doesn't work here?
>>
>> On Fri, 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-05 Thread Jungtaek Lim
"coalesce" looks like working: I misunderstood it as an efficient version
of "repartition" which does shuffle, so expected it would trigger shuffle.
My proposal would be covered as using "coalesce": thanks Joseph for
correction. Let me abandon the proposal.

We may still miss for now is documentation for the fact: the number of
partitions for states cannot be changed, so Spark restricts to modify
"spark.sql.shuffle.partitions" once the query is run (only applying to
streaming query, right?). If end users want to have more or less number of
state partitions, the value should be set before running the query at the
first time. Would it be better to add this to "Structured Streaming" doc?

I agree that decoupling state and partitions would not be simple. I'd try
out (offline) repartition first if the number of partitions would be really
matter for scalability / elasticity.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이 작성:

> I'd agree it might make sense to bundle this into an API. We'd have to
> think about whether it's a common enough use case to justify the API
> complexity.
>
> It might be worth exploring decoupling state and partitions, but I
> wouldn't want to start making decisions based on it without a clearer
> design picture. I would expect the decoupling to make it very difficult to
> ensure idempotency of state updates.
>
> Jose
>
> On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:
>
>> coalesce might work.
>>
>> Say "spark.sql.shuffle.partitions" = 200, and then "
>> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
>> create 200 instances for state but execute just 2 tasks.
>>
>> However I think further groupByKey operations downstream would need
>> similar coalesce.
>>
>> And this is assuming the user sets the right shuffle partitions upfront.
>>
>> It maybe worth to bundle this pattern as some builtin api so that it can
>> be transparent to the user. I am not sure how were you planning to expose
>> the state key groups at api level and if it would be transparent.
>>
>> IMO, decoupling the state and partitions and making it key based would
>> still be worth exploring to support dynamic state rebalancing. May be the
>> default HDFS based implementation can maintain the state partition wise and
>> not support it, but there could be implementations based on distributed k-v
>> store which supports this.
>>
>> Thanks,
>> Arun
>>
>>
>> On 3 August 2018 at 08:21, Joseph Torres 
>> wrote:
>>
>>> A coalesced RDD will definitely maintain any within-partition invariants
>>> that the original RDD maintained. It pretty much just runs its input
>>> partitions sequentially.
>>>
>>> There'd still be some Dataframe API work needed to get the coalesce
>>> operation where you want it to be, but this is much simpler than
>>> introducing a new concept of state key groups. As far as I can tell,
>>> state key groups are just the same thing that we currently call partitions
>>> of the aggregate RDD.
>>>
>>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>>>
 I’m afraid I don’t know about the details on coalesce(), but some
 finding resource for coalesce, it looks like helping reducing actual
 partitions.

 For streaming aggregation, state for all partitions (by default, 200)
 must be initialized and committed even it is being unchanged. Otherwise
 error occurred when reading a partition which is excluded in query
 previously. Moreover, it can’t find existing row from state or store row in
 wrong partition if partition id doesn’t match the expected id via hashing
 function.

 Could you verify coalesce() meets such requirements?

 On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
 wrote:

> Scheduling multiple partitions in the same task is basically what
> coalesce() does. Is there a reason that doesn't work here?
>
> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim 
> wrote:
>
>> Here's a link for Google docs (anyone can comment):
>>
>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>
>> Please note that I just copied the content to the google docs, so
>> someone could point out lack of details. I would like to start with
>> explanation of the concept, and once we are in agreement on going 
>> forward,
>> I could add more detail in doc, or even just start working and detail can
>> be shared with POC code or even WIP patch.
>>
>> Answer inlined for Arun's comments:
>>
>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>
>>> Can you share this in a google doc to make the discussions easier.?
>>>
>>>
>>>
>>> Thanks for coming up with ideas to improve upon the current
>>> restrictions with the SS state store.
>>>
>>>
>>>
>>> If I understood correctly, the plan is to introduce a logical
>>> partitioning scheme for state storage 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
I'd agree it might make sense to bundle this into an API. We'd have to
think about whether it's a common enough use case to justify the API
complexity.

It might be worth exploring decoupling state and partitions, but I wouldn't
want to start making decisions based on it without a clearer design
picture. I would expect the decoupling to make it very difficult to ensure
idempotency of state updates.

Jose

On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:

> coalesce might work.
>
> Say "spark.sql.shuffle.partitions" = 200, and then "
> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
> create 200 instances for state but execute just 2 tasks.
>
> However I think further groupByKey operations downstream would need
> similar coalesce.
>
> And this is assuming the user sets the right shuffle partitions upfront.
>
> It maybe worth to bundle this pattern as some builtin api so that it can
> be transparent to the user. I am not sure how were you planning to expose
> the state key groups at api level and if it would be transparent.
>
> IMO, decoupling the state and partitions and making it key based would
> still be worth exploring to support dynamic state rebalancing. May be the
> default HDFS based implementation can maintain the state partition wise and
> not support it, but there could be implementations based on distributed k-v
> store which supports this.
>
> Thanks,
> Arun
>
>
> On 3 August 2018 at 08:21, Joseph Torres 
> wrote:
>
>> A coalesced RDD will definitely maintain any within-partition invariants
>> that the original RDD maintained. It pretty much just runs its input
>> partitions sequentially.
>>
>> There'd still be some Dataframe API work needed to get the coalesce
>> operation where you want it to be, but this is much simpler than
>> introducing a new concept of state key groups. As far as I can tell,
>> state key groups are just the same thing that we currently call partitions
>> of the aggregate RDD.
>>
>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>>
>>> I’m afraid I don’t know about the details on coalesce(), but some
>>> finding resource for coalesce, it looks like helping reducing actual
>>> partitions.
>>>
>>> For streaming aggregation, state for all partitions (by default, 200)
>>> must be initialized and committed even it is being unchanged. Otherwise
>>> error occurred when reading a partition which is excluded in query
>>> previously. Moreover, it can’t find existing row from state or store row in
>>> wrong partition if partition id doesn’t match the expected id via hashing
>>> function.
>>>
>>> Could you verify coalesce() meets such requirements?
>>>
>>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
>>> wrote:
>>>
 Scheduling multiple partitions in the same task is basically what
 coalesce() does. Is there a reason that doesn't work here?

 On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:

> Here's a link for Google docs (anyone can comment):
> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6
> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>
> Please note that I just copied the content to the google docs, so
> someone could point out lack of details. I would like to start with
> explanation of the concept, and once we are in agreement on going forward,
> I could add more detail in doc, or even just start working and detail can
> be shared with POC code or even WIP patch.
>
> Answer inlined for Arun's comments:
>
> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>
>> Can you share this in a google doc to make the discussions easier.?
>>
>>
>>
>> Thanks for coming up with ideas to improve upon the current
>> restrictions with the SS state store.
>>
>>
>>
>> If I understood correctly, the plan is to introduce a logical
>> partitioning scheme for state storage (based on keys) independent of
>> spark’s partitioning so that the number of spark partitions can be 
>> varied.
>>
>>
>>
>> my 2 cents,
>>
>>
>>
>>1. The Partitioning is already a kind of a logical entity in
>>Spark. Maybe this can be leveraged to over-partition in advance 
>> (similar to
>>setting the number of state key groups in your proposal) but make it 
>> easy
>>to run more than one task (partition) per core (I am not sure how 
>> easy this
>>is currently). Then we can continue to leverage the existing state
>>implementation. This has similar limitations like what you pointed 
>> out (the
>>max number of partitions has to be fixed upfront). But once the over
>>provisioning of partitions is made easy it could be leveraged even for
>>non-stateful operations.
>>
>>
> If we could allow assigning multiple partitions in a task (say,
> parallelism or maximum concurrency), maybe we could achieve it a bit
> easier. I'm not pretty 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Arun Mahadevan
coalesce might work.

Say "spark.sql.shuffle.partitions" = 200, and then "
input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
create 200 instances for state but execute just 2 tasks.

However I think further groupByKey operations downstream would need similar
coalesce.

And this is assuming the user sets the right shuffle partitions upfront.

It maybe worth to bundle this pattern as some builtin api so that it can be
transparent to the user. I am not sure how were you planning to expose the
state key groups at api level and if it would be transparent.

IMO, decoupling the state and partitions and making it key based would
still be worth exploring to support dynamic state rebalancing. May be the
default HDFS based implementation can maintain the state partition wise and
not support it, but there could be implementations based on distributed k-v
store which supports this.

Thanks,
Arun


On 3 August 2018 at 08:21, Joseph Torres 
wrote:

> A coalesced RDD will definitely maintain any within-partition invariants
> that the original RDD maintained. It pretty much just runs its input
> partitions sequentially.
>
> There'd still be some Dataframe API work needed to get the coalesce
> operation where you want it to be, but this is much simpler than
> introducing a new concept of state key groups. As far as I can tell,
> state key groups are just the same thing that we currently call partitions
> of the aggregate RDD.
>
> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>
>> I’m afraid I don’t know about the details on coalesce(), but some finding
>> resource for coalesce, it looks like helping reducing actual partitions.
>>
>> For streaming aggregation, state for all partitions (by default, 200)
>> must be initialized and committed even it is being unchanged. Otherwise
>> error occurred when reading a partition which is excluded in query
>> previously. Moreover, it can’t find existing row from state or store row in
>> wrong partition if partition id doesn’t match the expected id via hashing
>> function.
>>
>> Could you verify coalesce() meets such requirements?
>>
>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
>> wrote:
>>
>>> Scheduling multiple partitions in the same task is basically what
>>> coalesce() does. Is there a reason that doesn't work here?
>>>
>>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>>>
 Here's a link for Google docs (anyone can comment):
 https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6
 EOdj_3pXEsyq4LGpyNs/edit?usp=sharing

 Please note that I just copied the content to the google docs, so
 someone could point out lack of details. I would like to start with
 explanation of the concept, and once we are in agreement on going forward,
 I could add more detail in doc, or even just start working and detail can
 be shared with POC code or even WIP patch.

 Answer inlined for Arun's comments:

 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:

> Can you share this in a google doc to make the discussions easier.?
>
>
>
> Thanks for coming up with ideas to improve upon the current
> restrictions with the SS state store.
>
>
>
> If I understood correctly, the plan is to introduce a logical
> partitioning scheme for state storage (based on keys) independent of
> spark’s partitioning so that the number of spark partitions can be varied.
>
>
>
> my 2 cents,
>
>
>
>1. The Partitioning is already a kind of a logical entity in
>Spark. Maybe this can be leveraged to over-partition in advance 
> (similar to
>setting the number of state key groups in your proposal) but make it 
> easy
>to run more than one task (partition) per core (I am not sure how easy 
> this
>is currently). Then we can continue to leverage the existing state
>implementation. This has similar limitations like what you pointed out 
> (the
>max number of partitions has to be fixed upfront). But once the over
>provisioning of partitions is made easy it could be leveraged even for
>non-stateful operations.
>
>
 If we could allow assigning multiple partitions in a task (say,
 parallelism or maximum concurrency), maybe we could achieve it a bit
 easier. I'm not pretty familiar with core of Spark, so I can't imagine how
 we could do it. In addition, partitions for downstream operators will be
 affected unless we don't shuffle afterwards.


>1. Decouple the state from partition completely associate it only
>with the keys. This would be the most flexible option and we can scale 
> the
>partitions up/down as we wish. This needs a scalable distributed state
>store implementation supporting fast look ups /storage by key.
>
>
 It can be achievable with couple of external storages like Redis or
 HBase or 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
A coalesced RDD will definitely maintain any within-partition invariants
that the original RDD maintained. It pretty much just runs its input
partitions sequentially.

There'd still be some Dataframe API work needed to get the coalesce
operation where you want it to be, but this is much simpler than
introducing a new concept of state key groups. As far as I can tell, state
key groups are just the same thing that we currently call partitions of the
aggregate RDD.

On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:

> I’m afraid I don’t know about the details on coalesce(), but some finding
> resource for coalesce, it looks like helping reducing actual partitions.
>
> For streaming aggregation, state for all partitions (by default, 200) must
> be initialized and committed even it is being unchanged. Otherwise error
> occurred when reading a partition which is excluded in query previously.
> Moreover, it can’t find existing row from state or store row in wrong
> partition if partition id doesn’t match the expected id via hashing
> function.
>
> Could you verify coalesce() meets such requirements?
>
> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
> wrote:
>
>> Scheduling multiple partitions in the same task is basically what
>> coalesce() does. Is there a reason that doesn't work here?
>>
>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>>
>>> Here's a link for Google docs (anyone can comment):
>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_
>>> 3pXEsyq4LGpyNs/edit?usp=sharing
>>>
>>> Please note that I just copied the content to the google docs, so
>>> someone could point out lack of details. I would like to start with
>>> explanation of the concept, and once we are in agreement on going forward,
>>> I could add more detail in doc, or even just start working and detail can
>>> be shared with POC code or even WIP patch.
>>>
>>> Answer inlined for Arun's comments:
>>>
>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>>
 Can you share this in a google doc to make the discussions easier.?



 Thanks for coming up with ideas to improve upon the current
 restrictions with the SS state store.



 If I understood correctly, the plan is to introduce a logical
 partitioning scheme for state storage (based on keys) independent of
 spark’s partitioning so that the number of spark partitions can be varied.



 my 2 cents,



1. The Partitioning is already a kind of a logical entity in Spark.
Maybe this can be leveraged to over-partition in advance (similar to
setting the number of state key groups in your proposal) but make it 
 easy
to run more than one task (partition) per core (I am not sure how easy 
 this
is currently). Then we can continue to leverage the existing state
implementation. This has similar limitations like what you pointed out 
 (the
max number of partitions has to be fixed upfront). But once the over
provisioning of partitions is made easy it could be leveraged even for
non-stateful operations.


>>> If we could allow assigning multiple partitions in a task (say,
>>> parallelism or maximum concurrency), maybe we could achieve it a bit
>>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
>>> we could do it. In addition, partitions for downstream operators will be
>>> affected unless we don't shuffle afterwards.
>>>
>>>
1. Decouple the state from partition completely associate it only
with the keys. This would be the most flexible option and we can scale 
 the
partitions up/down as we wish. This needs a scalable distributed state
store implementation supporting fast look ups /storage by key.


>>> It can be achievable with couple of external storages like Redis or
>>> HBase or so, but I would avoid the step which requires end users to
>>> maintain other system as well. Spark is coupled with specific version of
>>> Hadoop, so we could expect that end users could run and maintain HDFS.
>>>
>>>
 Thanks,

 Arun


 On 2 August 2018 at 23:45, Jungtaek Lim  wrote:

> Hi Spark devs,
>
> I have a new feature to propose and hear opinions on community. Not
> sure it is such a big change to worth to step on SPIP, so posting to dev
> mailing list  instead.
>
> > Feature
>
> Reconfigurable number of partitions on state operators in Structured
> Streaming
>
> > Rationalization
>
> Nowadays, state in structured streaming is stored individually via
> partition given such configuration "spark.sql.shuffle.partitions" and
> cannot modify the configuration after the query is run once. One
> contributor already submitted a patch [1] without knowing why such
> restriction came into play.
>
> Such restriction for state is necessary because state is 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Jungtaek Lim
I’m afraid I don’t know about the details on coalesce(), but some finding
resource for coalesce, it looks like helping reducing actual partitions.

For streaming aggregation, state for all partitions (by default, 200) must
be initialized and committed even it is being unchanged. Otherwise error
occurred when reading a partition which is excluded in query previously.
Moreover, it can’t find existing row from state or store row in wrong
partition if partition id doesn’t match the expected id via hashing
function.

Could you verify coalesce() meets such requirements?
On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
wrote:

> Scheduling multiple partitions in the same task is basically what
> coalesce() does. Is there a reason that doesn't work here?
>
> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>
>> Here's a link for Google docs (anyone can comment):
>>
>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>
>> Please note that I just copied the content to the google docs, so someone
>> could point out lack of details. I would like to start with explanation of
>> the concept, and once we are in agreement on going forward, I could add
>> more detail in doc, or even just start working and detail can be shared
>> with POC code or even WIP patch.
>>
>> Answer inlined for Arun's comments:
>>
>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>
>>> Can you share this in a google doc to make the discussions easier.?
>>>
>>>
>>>
>>> Thanks for coming up with ideas to improve upon the current restrictions
>>> with the SS state store.
>>>
>>>
>>>
>>> If I understood correctly, the plan is to introduce a logical
>>> partitioning scheme for state storage (based on keys) independent of
>>> spark’s partitioning so that the number of spark partitions can be varied.
>>>
>>>
>>>
>>> my 2 cents,
>>>
>>>
>>>
>>>1. The Partitioning is already a kind of a logical entity in Spark.
>>>Maybe this can be leveraged to over-partition in advance (similar to
>>>setting the number of state key groups in your proposal) but make it easy
>>>to run more than one task (partition) per core (I am not sure how easy 
>>> this
>>>is currently). Then we can continue to leverage the existing state
>>>implementation. This has similar limitations like what you pointed out 
>>> (the
>>>max number of partitions has to be fixed upfront). But once the over
>>>provisioning of partitions is made easy it could be leveraged even for
>>>non-stateful operations.
>>>
>>>
>> If we could allow assigning multiple partitions in a task (say,
>> parallelism or maximum concurrency), maybe we could achieve it a bit
>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
>> we could do it. In addition, partitions for downstream operators will be
>> affected unless we don't shuffle afterwards.
>>
>>
>>>1. Decouple the state from partition completely associate it only
>>>with the keys. This would be the most flexible option and we can scale 
>>> the
>>>partitions up/down as we wish. This needs a scalable distributed state
>>>store implementation supporting fast look ups /storage by key.
>>>
>>>
>> It can be achievable with couple of external storages like Redis or HBase
>> or so, but I would avoid the step which requires end users to maintain
>> other system as well. Spark is coupled with specific version of Hadoop, so
>> we could expect that end users could run and maintain HDFS.
>>
>>
>>> Thanks,
>>>
>>> Arun
>>>
>>>
>>> On 2 August 2018 at 23:45, Jungtaek Lim  wrote:
>>>
 Hi Spark devs,

 I have a new feature to propose and hear opinions on community. Not
 sure it is such a big change to worth to step on SPIP, so posting to dev
 mailing list  instead.

 > Feature

 Reconfigurable number of partitions on state operators in Structured
 Streaming

 > Rationalization

 Nowadays, state in structured streaming is stored individually via
 partition given such configuration "spark.sql.shuffle.partitions" and
 cannot modify the configuration after the query is run once. One
 contributor already submitted a patch [1] without knowing why such
 restriction came into play.

 Such restriction for state is necessary because state is distributed by
 hash function applied to key columns, but as a side-effect of restriction,
 we can't change partitions of stateful operators. End users would have
 various workloads and also various SLA (and SLA can be changed), so
 restricting to specific count of partitions would not satisfy their needs.
 Moreover, end users are not easy to indicate the configuration before they
 run query, and realize they can't modify it when they try to modify it.

 > Proposal

 The feature proposes decoupling data partitions and operator partitions
 via introducing key groups to state, enabling scalability of operator

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
Scheduling multiple partitions in the same task is basically what
coalesce() does. Is there a reason that doesn't work here?

On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:

> Here's a link for Google docs (anyone can comment):
> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_
> 3pXEsyq4LGpyNs/edit?usp=sharing
>
> Please note that I just copied the content to the google docs, so someone
> could point out lack of details. I would like to start with explanation of
> the concept, and once we are in agreement on going forward, I could add
> more detail in doc, or even just start working and detail can be shared
> with POC code or even WIP patch.
>
> Answer inlined for Arun's comments:
>
> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>
>> Can you share this in a google doc to make the discussions easier.?
>>
>>
>>
>> Thanks for coming up with ideas to improve upon the current restrictions
>> with the SS state store.
>>
>>
>>
>> If I understood correctly, the plan is to introduce a logical
>> partitioning scheme for state storage (based on keys) independent of
>> spark’s partitioning so that the number of spark partitions can be varied.
>>
>>
>>
>> my 2 cents,
>>
>>
>>
>>1. The Partitioning is already a kind of a logical entity in Spark.
>>Maybe this can be leveraged to over-partition in advance (similar to
>>setting the number of state key groups in your proposal) but make it easy
>>to run more than one task (partition) per core (I am not sure how easy 
>> this
>>is currently). Then we can continue to leverage the existing state
>>implementation. This has similar limitations like what you pointed out 
>> (the
>>max number of partitions has to be fixed upfront). But once the over
>>provisioning of partitions is made easy it could be leveraged even for
>>non-stateful operations.
>>
>>
> If we could allow assigning multiple partitions in a task (say,
> parallelism or maximum concurrency), maybe we could achieve it a bit
> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
> we could do it. In addition, partitions for downstream operators will be
> affected unless we don't shuffle afterwards.
>
>
>>1. Decouple the state from partition completely associate it only
>>with the keys. This would be the most flexible option and we can scale the
>>partitions up/down as we wish. This needs a scalable distributed state
>>store implementation supporting fast look ups /storage by key.
>>
>>
> It can be achievable with couple of external storages like Redis or HBase
> or so, but I would avoid the step which requires end users to maintain
> other system as well. Spark is coupled with specific version of Hadoop, so
> we could expect that end users could run and maintain HDFS.
>
>
>> Thanks,
>>
>> Arun
>>
>>
>> On 2 August 2018 at 23:45, Jungtaek Lim  wrote:
>>
>>> Hi Spark devs,
>>>
>>> I have a new feature to propose and hear opinions on community. Not sure
>>> it is such a big change to worth to step on SPIP, so posting to dev mailing
>>> list  instead.
>>>
>>> > Feature
>>>
>>> Reconfigurable number of partitions on state operators in Structured
>>> Streaming
>>>
>>> > Rationalization
>>>
>>> Nowadays, state in structured streaming is stored individually via
>>> partition given such configuration "spark.sql.shuffle.partitions" and
>>> cannot modify the configuration after the query is run once. One
>>> contributor already submitted a patch [1] without knowing why such
>>> restriction came into play.
>>>
>>> Such restriction for state is necessary because state is distributed by
>>> hash function applied to key columns, but as a side-effect of restriction,
>>> we can't change partitions of stateful operators. End users would have
>>> various workloads and also various SLA (and SLA can be changed), so
>>> restricting to specific count of partitions would not satisfy their needs.
>>> Moreover, end users are not easy to indicate the configuration before they
>>> run query, and realize they can't modify it when they try to modify it.
>>>
>>> > Proposal
>>>
>>> The feature proposes decoupling data partitions and operator partitions
>>> via introducing key groups to state, enabling scalability of operator
>>> partitions while state data partitions remain same (so no issue on state
>>> data). This approach is inspired by how Flink supports scalability with
>>> partitioned state.
>>>
>>> The concept itself is simple, while we apply such partitioning
>>> expression to the key columns (simplified):
>>>
>>> hash(key columns) % number of state operator partitions
>>>
>>> it will apply below partitioning expression so that it can be
>>> distributed via state data partitions but each state operator partition
>>> could handle multiple state data partitions.
>>>
>>> (hash(key columns) % number of state key groups) % number of state
>>> operator partitions
>>>
>>> The state data will not still be scalable actually, so the number of
>>> state 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Jungtaek Lim
Here's a link for Google docs (anyone can comment):
https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_3pXEsyq4LGpyNs/edit?usp=sharing

Please note that I just copied the content to the google docs, so someone
could point out lack of details. I would like to start with explanation of
the concept, and once we are in agreement on going forward, I could add
more detail in doc, or even just start working and detail can be shared
with POC code or even WIP patch.

Answer inlined for Arun's comments:

2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:

> Can you share this in a google doc to make the discussions easier.?
>
>
>
> Thanks for coming up with ideas to improve upon the current restrictions
> with the SS state store.
>
>
>
> If I understood correctly, the plan is to introduce a logical partitioning
> scheme for state storage (based on keys) independent of spark’s
> partitioning so that the number of spark partitions can be varied.
>
>
>
> my 2 cents,
>
>
>
>1. The Partitioning is already a kind of a logical entity in Spark.
>Maybe this can be leveraged to over-partition in advance (similar to
>setting the number of state key groups in your proposal) but make it easy
>to run more than one task (partition) per core (I am not sure how easy this
>is currently). Then we can continue to leverage the existing state
>implementation. This has similar limitations like what you pointed out (the
>max number of partitions has to be fixed upfront). But once the over
>provisioning of partitions is made easy it could be leveraged even for
>non-stateful operations.
>
>
If we could allow assigning multiple partitions in a task (say, parallelism
or maximum concurrency), maybe we could achieve it a bit easier. I'm not
pretty familiar with core of Spark, so I can't imagine how we could do it.
In addition, partitions for downstream operators will be affected unless we
don't shuffle afterwards.


>1. Decouple the state from partition completely associate it only with
>the keys. This would be the most flexible option and we can scale the
>partitions up/down as we wish. This needs a scalable distributed state
>store implementation supporting fast look ups /storage by key.
>
>
It can be achievable with couple of external storages like Redis or HBase
or so, but I would avoid the step which requires end users to maintain
other system as well. Spark is coupled with specific version of Hadoop, so
we could expect that end users could run and maintain HDFS.


> Thanks,
>
> Arun
>
>
> On 2 August 2018 at 23:45, Jungtaek Lim  wrote:
>
>> Hi Spark devs,
>>
>> I have a new feature to propose and hear opinions on community. Not sure
>> it is such a big change to worth to step on SPIP, so posting to dev mailing
>> list  instead.
>>
>> > Feature
>>
>> Reconfigurable number of partitions on state operators in Structured
>> Streaming
>>
>> > Rationalization
>>
>> Nowadays, state in structured streaming is stored individually via
>> partition given such configuration "spark.sql.shuffle.partitions" and
>> cannot modify the configuration after the query is run once. One
>> contributor already submitted a patch [1] without knowing why such
>> restriction came into play.
>>
>> Such restriction for state is necessary because state is distributed by
>> hash function applied to key columns, but as a side-effect of restriction,
>> we can't change partitions of stateful operators. End users would have
>> various workloads and also various SLA (and SLA can be changed), so
>> restricting to specific count of partitions would not satisfy their needs.
>> Moreover, end users are not easy to indicate the configuration before they
>> run query, and realize they can't modify it when they try to modify it.
>>
>> > Proposal
>>
>> The feature proposes decoupling data partitions and operator partitions
>> via introducing key groups to state, enabling scalability of operator
>> partitions while state data partitions remain same (so no issue on state
>> data). This approach is inspired by how Flink supports scalability with
>> partitioned state.
>>
>> The concept itself is simple, while we apply such partitioning expression
>> to the key columns (simplified):
>>
>> hash(key columns) % number of state operator partitions
>>
>> it will apply below partitioning expression so that it can be distributed
>> via state data partitions but each state operator partition could handle
>> multiple state data partitions.
>>
>> (hash(key columns) % number of state key groups) % number of state
>> operator partitions
>>
>> The state data will not still be scalable actually, so the number of
>> state key groups will be a new hard limit (we should restrict modifying it
>> once query is run). But we can change the number of stateful operator
>> partitions afterwards. The number of stateful operator partitions should be
>> equal or smaller than the number of state key groups. (It doesn't make
>> sense for partitions to be 

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Arun Mahadevan
Can you share this in a google doc to make the discussions easier.?



Thanks for coming up with ideas to improve upon the current restrictions
with the SS state store.



If I understood correctly, the plan is to introduce a logical partitioning
scheme for state storage (based on keys) independent of spark’s
partitioning so that the number of spark partitions can be varied.



my 2 cents,



   1. The Partitioning is already a kind of a logical entity in Spark.
   Maybe this can be leveraged to over-partition in advance (similar to
   setting the number of state key groups in your proposal) but make it easy
   to run more than one task (partition) per core (I am not sure how easy this
   is currently). Then we can continue to leverage the existing state
   implementation. This has similar limitations like what you pointed out (the
   max number of partitions has to be fixed upfront). But once the over
   provisioning of partitions is made easy it could be leveraged even for
   non-stateful operations.



   1. Decouple the state from partition completely associate it only with
   the keys. This would be the most flexible option and we can scale the
   partitions up/down as we wish. This needs a scalable distributed state
   store implementation supporting fast look ups /storage by key.



Thanks,

Arun


On 2 August 2018 at 23:45, Jungtaek Lim  wrote:

> Hi Spark devs,
>
> I have a new feature to propose and hear opinions on community. Not sure
> it is such a big change to worth to step on SPIP, so posting to dev mailing
> list  instead.
>
> > Feature
>
> Reconfigurable number of partitions on state operators in Structured
> Streaming
>
> > Rationalization
>
> Nowadays, state in structured streaming is stored individually via
> partition given such configuration "spark.sql.shuffle.partitions" and
> cannot modify the configuration after the query is run once. One
> contributor already submitted a patch [1] without knowing why such
> restriction came into play.
>
> Such restriction for state is necessary because state is distributed by
> hash function applied to key columns, but as a side-effect of restriction,
> we can't change partitions of stateful operators. End users would have
> various workloads and also various SLA (and SLA can be changed), so
> restricting to specific count of partitions would not satisfy their needs.
> Moreover, end users are not easy to indicate the configuration before they
> run query, and realize they can't modify it when they try to modify it.
>
> > Proposal
>
> The feature proposes decoupling data partitions and operator partitions
> via introducing key groups to state, enabling scalability of operator
> partitions while state data partitions remain same (so no issue on state
> data). This approach is inspired by how Flink supports scalability with
> partitioned state.
>
> The concept itself is simple, while we apply such partitioning expression
> to the key columns (simplified):
>
> hash(key columns) % number of state operator partitions
>
> it will apply below partitioning expression so that it can be distributed
> via state data partitions but each state operator partition could handle
> multiple state data partitions.
>
> (hash(key columns) % number of state key groups) % number of state
> operator partitions
>
> The state data will not still be scalable actually, so the number of state
> key groups will be a new hard limit (we should restrict modifying it once
> query is run). But we can change the number of stateful operator partitions
> afterwards. The number of stateful operator partitions should be equal or
> smaller than the number of state key groups. (It doesn't make sense for
> partitions to be not assigned any state key group and idle.)
>
> > Possible Risk
>
> Performance might be affected, because either one should be performed:
>
> 1. each partition should calculate key group id per key
> 2. key group id should be calculated and inserted to the row before
> passing state operators (shuffle), and removed after passing state operators
>
> There's other performance concern like committing multiple states in a
> partition when number of operator partitions < number of state key groups,
> but we could run it concurrently (at least for HDFS state store), and
> actually it is also an issue for nowadays (all tasks may not be launched
> together).
>
> Code complexity would be introduced as expected.
>
> > Limitation
>
> For the first time, it will not support dynamic reconfiguration like
> changing the value during query is running. Actually it can be achieved
> simply via unloading all the state providers in executors before running
> next batch, but it would invalidate all state caches and may incur high
> latency to reload the state cache for previous batch. But I guess we could
> adopt it if we feel bigger merit for reconfiguring partitions of stateful
> operators against reloading state.
>
> > Rejected alternatives
>
> * Offline physical repartitioning of state 

[Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Jungtaek Lim
Hi Spark devs,

I have a new feature to propose and hear opinions on community. Not sure it
is such a big change to worth to step on SPIP, so posting to dev mailing
list  instead.

> Feature

Reconfigurable number of partitions on state operators in Structured
Streaming

> Rationalization

Nowadays, state in structured streaming is stored individually via
partition given such configuration "spark.sql.shuffle.partitions" and
cannot modify the configuration after the query is run once. One
contributor already submitted a patch [1] without knowing why such
restriction came into play.

Such restriction for state is necessary because state is distributed by
hash function applied to key columns, but as a side-effect of restriction,
we can't change partitions of stateful operators. End users would have
various workloads and also various SLA (and SLA can be changed), so
restricting to specific count of partitions would not satisfy their needs.
Moreover, end users are not easy to indicate the configuration before they
run query, and realize they can't modify it when they try to modify it.

> Proposal

The feature proposes decoupling data partitions and operator partitions via
introducing key groups to state, enabling scalability of operator
partitions while state data partitions remain same (so no issue on state
data). This approach is inspired by how Flink supports scalability with
partitioned state.

The concept itself is simple, while we apply such partitioning expression
to the key columns (simplified):

hash(key columns) % number of state operator partitions

it will apply below partitioning expression so that it can be distributed
via state data partitions but each state operator partition could handle
multiple state data partitions.

(hash(key columns) % number of state key groups) % number of state operator
partitions

The state data will not still be scalable actually, so the number of state
key groups will be a new hard limit (we should restrict modifying it once
query is run). But we can change the number of stateful operator partitions
afterwards. The number of stateful operator partitions should be equal or
smaller than the number of state key groups. (It doesn't make sense for
partitions to be not assigned any state key group and idle.)

> Possible Risk

Performance might be affected, because either one should be performed:

1. each partition should calculate key group id per key
2. key group id should be calculated and inserted to the row before passing
state operators (shuffle), and removed after passing state operators

There's other performance concern like committing multiple states in a
partition when number of operator partitions < number of state key groups,
but we could run it concurrently (at least for HDFS state store), and
actually it is also an issue for nowadays (all tasks may not be launched
together).

Code complexity would be introduced as expected.

> Limitation

For the first time, it will not support dynamic reconfiguration like
changing the value during query is running. Actually it can be achieved
simply via unloading all the state providers in executors before running
next batch, but it would invalidate all state caches and may incur high
latency to reload the state cache for previous batch. But I guess we could
adopt it if we feel bigger merit for reconfiguring partitions of stateful
operators against reloading state.

> Rejected alternatives

* Offline physical repartitioning of state data (loading all state and
recalculate new partition id per key and resave)

I thought about it but discarded since it should take non-trivial time to
repartition once state is going to be huge. Also it is not easy to
implement the tool running efficiently. (whole state may not be fit in
memory, have to handle them concurrently, etc.)

Please share your opinion about this proposal: opinion regarding accept or
decline, things to correct in my mail, any suggestions for improvement, etc.

Please also let me know if it would be better to move this to google doc or
pdf with filing JIRA issue.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://github.com/apache/spark/pull/21718