I'd agree it might make sense to bundle this into an API. We'd have to
think about whether it's a common enough use case to justify the API
complexity.

It might be worth exploring decoupling state and partitions, but I wouldn't
want to start making decisions based on it without a clearer design
picture. I would expect the decoupling to make it very difficult to ensure
idempotency of state updates.

Jose

On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan <ar...@apache.org> wrote:

> coalesce might work.
>
> Say "spark.sql.shuffle.partitions" = 200, and then "
> input.readStream.map.filter.groupByKey(..).coalesce(2)...." would still
> create 200 instances for state but execute just 2 tasks.
>
> However I think further groupByKey operations downstream would need
> similar coalesce.
>
> And this is assuming the user sets the right shuffle partitions upfront.
>
> It maybe worth to bundle this pattern as some builtin api so that it can
> be transparent to the user. I am not sure how were you planning to expose
> the state key groups at api level and if it would be transparent.
>
> IMO, decoupling the state and partitions and making it key based would
> still be worth exploring to support dynamic state rebalancing. May be the
> default HDFS based implementation can maintain the state partition wise and
> not support it, but there could be implementations based on distributed k-v
> store which supports this.
>
> Thanks,
> Arun
>
>
> On 3 August 2018 at 08:21, Joseph Torres <joseph.tor...@databricks.com>
> wrote:
>
>> A coalesced RDD will definitely maintain any within-partition invariants
>> that the original RDD maintained. It pretty much just runs its input
>> partitions sequentially.
>>
>> There'd still be some Dataframe API work needed to get the coalesce
>> operation where you want it to be, but this is much simpler than
>> introducing a new concept of state key groups. As far as I can tell,
>> state key groups are just the same thing that we currently call partitions
>> of the aggregate RDD.
>>
>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim <kabh...@gmail.com> wrote:
>>
>>> I’m afraid I don’t know about the details on coalesce(), but some
>>> finding resource for coalesce, it looks like helping reducing actual
>>> partitions.
>>>
>>> For streaming aggregation, state for all partitions (by default, 200)
>>> must be initialized and committed even it is being unchanged. Otherwise
>>> error occurred when reading a partition which is excluded in query
>>> previously. Moreover, it can’t find existing row from state or store row in
>>> wrong partition if partition id doesn’t match the expected id via hashing
>>> function.
>>>
>>> Could you verify coalesce() meets such requirements?
>>>
>>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres <joseph.tor...@databricks.com>
>>> wrote:
>>>
>>>> Scheduling multiple partitions in the same task is basically what
>>>> coalesce() does. Is there a reason that doesn't work here?
>>>>
>>>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>
>>>>> Here's a link for Google docs (anyone can comment):
>>>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6
>>>>> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>>>>
>>>>> Please note that I just copied the content to the google docs, so
>>>>> someone could point out lack of details. I would like to start with
>>>>> explanation of the concept, and once we are in agreement on going forward,
>>>>> I could add more detail in doc, or even just start working and detail can
>>>>> be shared with POC code or even WIP patch.
>>>>>
>>>>> Answer inlined for Arun's comments:
>>>>>
>>>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan <ar...@apache.org>님이 작성:
>>>>>
>>>>>> Can you share this in a google doc to make the discussions easier.?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for coming up with ideas to improve upon the current
>>>>>> restrictions with the SS state store.
>>>>>>
>>>>>>
>>>>>>
>>>>>> If I understood correctly, the plan is to introduce a logical
>>>>>> partitioning scheme for state storage (based on keys) independent of
>>>>>> spark’s partitioning so that the number of spark partitions can be 
>>>>>> varied.
>>>>>>
>>>>>>
>>>>>>
>>>>>> my 2 cents,
>>>>>>
>>>>>>
>>>>>>
>>>>>>    1. The Partitioning is already a kind of a logical entity in
>>>>>>    Spark. Maybe this can be leveraged to over-partition in advance 
>>>>>> (similar to
>>>>>>    setting the number of state key groups in your proposal) but make it 
>>>>>> easy
>>>>>>    to run more than one task (partition) per core (I am not sure how 
>>>>>> easy this
>>>>>>    is currently). Then we can continue to leverage the existing state
>>>>>>    implementation. This has similar limitations like what you pointed 
>>>>>> out (the
>>>>>>    max number of partitions has to be fixed upfront). But once the over
>>>>>>    provisioning of partitions is made easy it could be leveraged even for
>>>>>>    non-stateful operations.
>>>>>>
>>>>>>
>>>>> If we could allow assigning multiple partitions in a task (say,
>>>>> parallelism or maximum concurrency), maybe we could achieve it a bit
>>>>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
>>>>> we could do it. In addition, partitions for downstream operators will be
>>>>> affected unless we don't shuffle afterwards.
>>>>>
>>>>>
>>>>>>    1. Decouple the state from partition completely associate it only
>>>>>>    with the keys. This would be the most flexible option and we can 
>>>>>> scale the
>>>>>>    partitions up/down as we wish. This needs a scalable distributed state
>>>>>>    store implementation supporting fast look ups /storage by key.
>>>>>>
>>>>>>
>>>>> It can be achievable with couple of external storages like Redis or
>>>>> HBase or so, but I would avoid the step which requires end users to
>>>>> maintain other system as well. Spark is coupled with specific version of
>>>>> Hadoop, so we could expect that end users could run and maintain HDFS.
>>>>>
>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Arun
>>>>>>
>>>>>>
>>>>>> On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Spark devs,
>>>>>>>
>>>>>>> I have a new feature to propose and hear opinions on community. Not
>>>>>>> sure it is such a big change to worth to step on SPIP, so posting to dev
>>>>>>> mailing list  instead.
>>>>>>>
>>>>>>> > Feature
>>>>>>>
>>>>>>> Reconfigurable number of partitions on state operators in Structured
>>>>>>> Streaming
>>>>>>>
>>>>>>> > Rationalization
>>>>>>>
>>>>>>> Nowadays, state in structured streaming is stored individually via
>>>>>>> partition given such configuration "spark.sql.shuffle.partitions" and
>>>>>>> cannot modify the configuration after the query is run once. One
>>>>>>> contributor already submitted a patch [1] without knowing why such
>>>>>>> restriction came into play.
>>>>>>>
>>>>>>> Such restriction for state is necessary because state is distributed
>>>>>>> by hash function applied to key columns, but as a side-effect of
>>>>>>> restriction, we can't change partitions of stateful operators. End users
>>>>>>> would have various workloads and also various SLA (and SLA can be 
>>>>>>> changed),
>>>>>>> so restricting to specific count of partitions would not satisfy their
>>>>>>> needs. Moreover, end users are not easy to indicate the configuration
>>>>>>> before they run query, and realize they can't modify it when they try to
>>>>>>> modify it.
>>>>>>>
>>>>>>> > Proposal
>>>>>>>
>>>>>>> The feature proposes decoupling data partitions and operator
>>>>>>> partitions via introducing key groups to state, enabling scalability of
>>>>>>> operator partitions while state data partitions remain same (so no 
>>>>>>> issue on
>>>>>>> state data). This approach is inspired by how Flink supports scalability
>>>>>>> with partitioned state.
>>>>>>>
>>>>>>> The concept itself is simple, while we apply such partitioning
>>>>>>> expression to the key columns (simplified):
>>>>>>>
>>>>>>> hash(key columns) % number of state operator partitions
>>>>>>>
>>>>>>> it will apply below partitioning expression so that it can be
>>>>>>> distributed via state data partitions but each state operator partition
>>>>>>> could handle multiple state data partitions.
>>>>>>>
>>>>>>> (hash(key columns) % number of state key groups) % number of state
>>>>>>> operator partitions
>>>>>>>
>>>>>>> The state data will not still be scalable actually, so the number of
>>>>>>> state key groups will be a new hard limit (we should restrict modifying 
>>>>>>> it
>>>>>>> once query is run). But we can change the number of stateful operator
>>>>>>> partitions afterwards. The number of stateful operator partitions 
>>>>>>> should be
>>>>>>> equal or smaller than the number of state key groups. (It doesn't make
>>>>>>> sense for partitions to be not assigned any state key group and idle.)
>>>>>>>
>>>>>>> > Possible Risk
>>>>>>>
>>>>>>> Performance might be affected, because either one should be
>>>>>>> performed:
>>>>>>>
>>>>>>> 1. each partition should calculate key group id per key
>>>>>>> 2. key group id should be calculated and inserted to the row before
>>>>>>> passing state operators (shuffle), and removed after passing state 
>>>>>>> operators
>>>>>>>
>>>>>>> There's other performance concern like committing multiple states in
>>>>>>> a partition when number of operator partitions < number of state key
>>>>>>> groups, but we could run it concurrently (at least for HDFS state 
>>>>>>> store),
>>>>>>> and actually it is also an issue for nowadays (all tasks may not be
>>>>>>> launched together).
>>>>>>>
>>>>>>> Code complexity would be introduced as expected.
>>>>>>>
>>>>>>> > Limitation
>>>>>>>
>>>>>>> For the first time, it will not support dynamic reconfiguration like
>>>>>>> changing the value during query is running. Actually it can be achieved
>>>>>>> simply via unloading all the state providers in executors before running
>>>>>>> next batch, but it would invalidate all state caches and may incur high
>>>>>>> latency to reload the state cache for previous batch. But I guess we 
>>>>>>> could
>>>>>>> adopt it if we feel bigger merit for reconfiguring partitions of 
>>>>>>> stateful
>>>>>>> operators against reloading state.
>>>>>>>
>>>>>>> > Rejected alternatives
>>>>>>>
>>>>>>> * Offline physical repartitioning of state data (loading all state
>>>>>>> and recalculate new partition id per key and resave)
>>>>>>>
>>>>>>> I thought about it but discarded since it should take non-trivial
>>>>>>> time to repartition once state is going to be huge. Also it is not easy 
>>>>>>> to
>>>>>>> implement the tool running efficiently. (whole state may not be fit in
>>>>>>> memory, have to handle them concurrently, etc.)
>>>>>>>
>>>>>>> Please share your opinion about this proposal: opinion regarding
>>>>>>> accept or decline, things to correct in my mail, any suggestions for
>>>>>>> improvement, etc.
>>>>>>>
>>>>>>> Please also let me know if it would be better to move this to google
>>>>>>> doc or pdf with filing JIRA issue.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>> 1. https://github.com/apache/spark/pull/21718
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
>

Reply via email to