I'd agree it might make sense to bundle this into an API. We'd have to think about whether it's a common enough use case to justify the API complexity.
It might be worth exploring decoupling state and partitions, but I wouldn't want to start making decisions based on it without a clearer design picture. I would expect the decoupling to make it very difficult to ensure idempotency of state updates. Jose On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan <ar...@apache.org> wrote: > coalesce might work. > > Say "spark.sql.shuffle.partitions" = 200, and then " > input.readStream.map.filter.groupByKey(..).coalesce(2)...." would still > create 200 instances for state but execute just 2 tasks. > > However I think further groupByKey operations downstream would need > similar coalesce. > > And this is assuming the user sets the right shuffle partitions upfront. > > It maybe worth to bundle this pattern as some builtin api so that it can > be transparent to the user. I am not sure how were you planning to expose > the state key groups at api level and if it would be transparent. > > IMO, decoupling the state and partitions and making it key based would > still be worth exploring to support dynamic state rebalancing. May be the > default HDFS based implementation can maintain the state partition wise and > not support it, but there could be implementations based on distributed k-v > store which supports this. > > Thanks, > Arun > > > On 3 August 2018 at 08:21, Joseph Torres <joseph.tor...@databricks.com> > wrote: > >> A coalesced RDD will definitely maintain any within-partition invariants >> that the original RDD maintained. It pretty much just runs its input >> partitions sequentially. >> >> There'd still be some Dataframe API work needed to get the coalesce >> operation where you want it to be, but this is much simpler than >> introducing a new concept of state key groups. As far as I can tell, >> state key groups are just the same thing that we currently call partitions >> of the aggregate RDD. >> >> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim <kabh...@gmail.com> wrote: >> >>> I’m afraid I don’t know about the details on coalesce(), but some >>> finding resource for coalesce, it looks like helping reducing actual >>> partitions. >>> >>> For streaming aggregation, state for all partitions (by default, 200) >>> must be initialized and committed even it is being unchanged. Otherwise >>> error occurred when reading a partition which is excluded in query >>> previously. Moreover, it can’t find existing row from state or store row in >>> wrong partition if partition id doesn’t match the expected id via hashing >>> function. >>> >>> Could you verify coalesce() meets such requirements? >>> >>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres <joseph.tor...@databricks.com> >>> wrote: >>> >>>> Scheduling multiple partitions in the same task is basically what >>>> coalesce() does. Is there a reason that doesn't work here? >>>> >>>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim <kabh...@gmail.com> wrote: >>>> >>>>> Here's a link for Google docs (anyone can comment): >>>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6 >>>>> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing >>>>> >>>>> Please note that I just copied the content to the google docs, so >>>>> someone could point out lack of details. I would like to start with >>>>> explanation of the concept, and once we are in agreement on going forward, >>>>> I could add more detail in doc, or even just start working and detail can >>>>> be shared with POC code or even WIP patch. >>>>> >>>>> Answer inlined for Arun's comments: >>>>> >>>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan <ar...@apache.org>님이 작성: >>>>> >>>>>> Can you share this in a google doc to make the discussions easier.? >>>>>> >>>>>> >>>>>> >>>>>> Thanks for coming up with ideas to improve upon the current >>>>>> restrictions with the SS state store. >>>>>> >>>>>> >>>>>> >>>>>> If I understood correctly, the plan is to introduce a logical >>>>>> partitioning scheme for state storage (based on keys) independent of >>>>>> spark’s partitioning so that the number of spark partitions can be >>>>>> varied. >>>>>> >>>>>> >>>>>> >>>>>> my 2 cents, >>>>>> >>>>>> >>>>>> >>>>>> 1. The Partitioning is already a kind of a logical entity in >>>>>> Spark. Maybe this can be leveraged to over-partition in advance >>>>>> (similar to >>>>>> setting the number of state key groups in your proposal) but make it >>>>>> easy >>>>>> to run more than one task (partition) per core (I am not sure how >>>>>> easy this >>>>>> is currently). Then we can continue to leverage the existing state >>>>>> implementation. This has similar limitations like what you pointed >>>>>> out (the >>>>>> max number of partitions has to be fixed upfront). But once the over >>>>>> provisioning of partitions is made easy it could be leveraged even for >>>>>> non-stateful operations. >>>>>> >>>>>> >>>>> If we could allow assigning multiple partitions in a task (say, >>>>> parallelism or maximum concurrency), maybe we could achieve it a bit >>>>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how >>>>> we could do it. In addition, partitions for downstream operators will be >>>>> affected unless we don't shuffle afterwards. >>>>> >>>>> >>>>>> 1. Decouple the state from partition completely associate it only >>>>>> with the keys. This would be the most flexible option and we can >>>>>> scale the >>>>>> partitions up/down as we wish. This needs a scalable distributed state >>>>>> store implementation supporting fast look ups /storage by key. >>>>>> >>>>>> >>>>> It can be achievable with couple of external storages like Redis or >>>>> HBase or so, but I would avoid the step which requires end users to >>>>> maintain other system as well. Spark is coupled with specific version of >>>>> Hadoop, so we could expect that end users could run and maintain HDFS. >>>>> >>>>> >>>>>> Thanks, >>>>>> >>>>>> Arun >>>>>> >>>>>> >>>>>> On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote: >>>>>> >>>>>>> Hi Spark devs, >>>>>>> >>>>>>> I have a new feature to propose and hear opinions on community. Not >>>>>>> sure it is such a big change to worth to step on SPIP, so posting to dev >>>>>>> mailing list instead. >>>>>>> >>>>>>> > Feature >>>>>>> >>>>>>> Reconfigurable number of partitions on state operators in Structured >>>>>>> Streaming >>>>>>> >>>>>>> > Rationalization >>>>>>> >>>>>>> Nowadays, state in structured streaming is stored individually via >>>>>>> partition given such configuration "spark.sql.shuffle.partitions" and >>>>>>> cannot modify the configuration after the query is run once. One >>>>>>> contributor already submitted a patch [1] without knowing why such >>>>>>> restriction came into play. >>>>>>> >>>>>>> Such restriction for state is necessary because state is distributed >>>>>>> by hash function applied to key columns, but as a side-effect of >>>>>>> restriction, we can't change partitions of stateful operators. End users >>>>>>> would have various workloads and also various SLA (and SLA can be >>>>>>> changed), >>>>>>> so restricting to specific count of partitions would not satisfy their >>>>>>> needs. Moreover, end users are not easy to indicate the configuration >>>>>>> before they run query, and realize they can't modify it when they try to >>>>>>> modify it. >>>>>>> >>>>>>> > Proposal >>>>>>> >>>>>>> The feature proposes decoupling data partitions and operator >>>>>>> partitions via introducing key groups to state, enabling scalability of >>>>>>> operator partitions while state data partitions remain same (so no >>>>>>> issue on >>>>>>> state data). This approach is inspired by how Flink supports scalability >>>>>>> with partitioned state. >>>>>>> >>>>>>> The concept itself is simple, while we apply such partitioning >>>>>>> expression to the key columns (simplified): >>>>>>> >>>>>>> hash(key columns) % number of state operator partitions >>>>>>> >>>>>>> it will apply below partitioning expression so that it can be >>>>>>> distributed via state data partitions but each state operator partition >>>>>>> could handle multiple state data partitions. >>>>>>> >>>>>>> (hash(key columns) % number of state key groups) % number of state >>>>>>> operator partitions >>>>>>> >>>>>>> The state data will not still be scalable actually, so the number of >>>>>>> state key groups will be a new hard limit (we should restrict modifying >>>>>>> it >>>>>>> once query is run). But we can change the number of stateful operator >>>>>>> partitions afterwards. The number of stateful operator partitions >>>>>>> should be >>>>>>> equal or smaller than the number of state key groups. (It doesn't make >>>>>>> sense for partitions to be not assigned any state key group and idle.) >>>>>>> >>>>>>> > Possible Risk >>>>>>> >>>>>>> Performance might be affected, because either one should be >>>>>>> performed: >>>>>>> >>>>>>> 1. each partition should calculate key group id per key >>>>>>> 2. key group id should be calculated and inserted to the row before >>>>>>> passing state operators (shuffle), and removed after passing state >>>>>>> operators >>>>>>> >>>>>>> There's other performance concern like committing multiple states in >>>>>>> a partition when number of operator partitions < number of state key >>>>>>> groups, but we could run it concurrently (at least for HDFS state >>>>>>> store), >>>>>>> and actually it is also an issue for nowadays (all tasks may not be >>>>>>> launched together). >>>>>>> >>>>>>> Code complexity would be introduced as expected. >>>>>>> >>>>>>> > Limitation >>>>>>> >>>>>>> For the first time, it will not support dynamic reconfiguration like >>>>>>> changing the value during query is running. Actually it can be achieved >>>>>>> simply via unloading all the state providers in executors before running >>>>>>> next batch, but it would invalidate all state caches and may incur high >>>>>>> latency to reload the state cache for previous batch. But I guess we >>>>>>> could >>>>>>> adopt it if we feel bigger merit for reconfiguring partitions of >>>>>>> stateful >>>>>>> operators against reloading state. >>>>>>> >>>>>>> > Rejected alternatives >>>>>>> >>>>>>> * Offline physical repartitioning of state data (loading all state >>>>>>> and recalculate new partition id per key and resave) >>>>>>> >>>>>>> I thought about it but discarded since it should take non-trivial >>>>>>> time to repartition once state is going to be huge. Also it is not easy >>>>>>> to >>>>>>> implement the tool running efficiently. (whole state may not be fit in >>>>>>> memory, have to handle them concurrently, etc.) >>>>>>> >>>>>>> Please share your opinion about this proposal: opinion regarding >>>>>>> accept or decline, things to correct in my mail, any suggestions for >>>>>>> improvement, etc. >>>>>>> >>>>>>> Please also let me know if it would be better to move this to google >>>>>>> doc or pdf with filing JIRA issue. >>>>>>> >>>>>>> Thanks, >>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>> >>>>>>> 1. https://github.com/apache/spark/pull/21718 >>>>>>> >>>>>> >>>>>> >>>> >> >