Scheduling multiple partitions in the same task is basically what coalesce() does. Is there a reason that doesn't work here?
On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim <kabh...@gmail.com> wrote: > Here's a link for Google docs (anyone can comment): > https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_ > 3pXEsyq4LGpyNs/edit?usp=sharing > > Please note that I just copied the content to the google docs, so someone > could point out lack of details. I would like to start with explanation of > the concept, and once we are in agreement on going forward, I could add > more detail in doc, or even just start working and detail can be shared > with POC code or even WIP patch. > > Answer inlined for Arun's comments: > > 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan <ar...@apache.org>님이 작성: > >> Can you share this in a google doc to make the discussions easier.? >> >> >> >> Thanks for coming up with ideas to improve upon the current restrictions >> with the SS state store. >> >> >> >> If I understood correctly, the plan is to introduce a logical >> partitioning scheme for state storage (based on keys) independent of >> spark’s partitioning so that the number of spark partitions can be varied. >> >> >> >> my 2 cents, >> >> >> >> 1. The Partitioning is already a kind of a logical entity in Spark. >> Maybe this can be leveraged to over-partition in advance (similar to >> setting the number of state key groups in your proposal) but make it easy >> to run more than one task (partition) per core (I am not sure how easy >> this >> is currently). Then we can continue to leverage the existing state >> implementation. This has similar limitations like what you pointed out >> (the >> max number of partitions has to be fixed upfront). But once the over >> provisioning of partitions is made easy it could be leveraged even for >> non-stateful operations. >> >> > If we could allow assigning multiple partitions in a task (say, > parallelism or maximum concurrency), maybe we could achieve it a bit > easier. I'm not pretty familiar with core of Spark, so I can't imagine how > we could do it. In addition, partitions for downstream operators will be > affected unless we don't shuffle afterwards. > > >> 1. Decouple the state from partition completely associate it only >> with the keys. This would be the most flexible option and we can scale the >> partitions up/down as we wish. This needs a scalable distributed state >> store implementation supporting fast look ups /storage by key. >> >> > It can be achievable with couple of external storages like Redis or HBase > or so, but I would avoid the step which requires end users to maintain > other system as well. Spark is coupled with specific version of Hadoop, so > we could expect that end users could run and maintain HDFS. > > >> Thanks, >> >> Arun >> >> >> On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote: >> >>> Hi Spark devs, >>> >>> I have a new feature to propose and hear opinions on community. Not sure >>> it is such a big change to worth to step on SPIP, so posting to dev mailing >>> list instead. >>> >>> > Feature >>> >>> Reconfigurable number of partitions on state operators in Structured >>> Streaming >>> >>> > Rationalization >>> >>> Nowadays, state in structured streaming is stored individually via >>> partition given such configuration "spark.sql.shuffle.partitions" and >>> cannot modify the configuration after the query is run once. One >>> contributor already submitted a patch [1] without knowing why such >>> restriction came into play. >>> >>> Such restriction for state is necessary because state is distributed by >>> hash function applied to key columns, but as a side-effect of restriction, >>> we can't change partitions of stateful operators. End users would have >>> various workloads and also various SLA (and SLA can be changed), so >>> restricting to specific count of partitions would not satisfy their needs. >>> Moreover, end users are not easy to indicate the configuration before they >>> run query, and realize they can't modify it when they try to modify it. >>> >>> > Proposal >>> >>> The feature proposes decoupling data partitions and operator partitions >>> via introducing key groups to state, enabling scalability of operator >>> partitions while state data partitions remain same (so no issue on state >>> data). This approach is inspired by how Flink supports scalability with >>> partitioned state. >>> >>> The concept itself is simple, while we apply such partitioning >>> expression to the key columns (simplified): >>> >>> hash(key columns) % number of state operator partitions >>> >>> it will apply below partitioning expression so that it can be >>> distributed via state data partitions but each state operator partition >>> could handle multiple state data partitions. >>> >>> (hash(key columns) % number of state key groups) % number of state >>> operator partitions >>> >>> The state data will not still be scalable actually, so the number of >>> state key groups will be a new hard limit (we should restrict modifying it >>> once query is run). But we can change the number of stateful operator >>> partitions afterwards. The number of stateful operator partitions should be >>> equal or smaller than the number of state key groups. (It doesn't make >>> sense for partitions to be not assigned any state key group and idle.) >>> >>> > Possible Risk >>> >>> Performance might be affected, because either one should be performed: >>> >>> 1. each partition should calculate key group id per key >>> 2. key group id should be calculated and inserted to the row before >>> passing state operators (shuffle), and removed after passing state operators >>> >>> There's other performance concern like committing multiple states in a >>> partition when number of operator partitions < number of state key groups, >>> but we could run it concurrently (at least for HDFS state store), and >>> actually it is also an issue for nowadays (all tasks may not be launched >>> together). >>> >>> Code complexity would be introduced as expected. >>> >>> > Limitation >>> >>> For the first time, it will not support dynamic reconfiguration like >>> changing the value during query is running. Actually it can be achieved >>> simply via unloading all the state providers in executors before running >>> next batch, but it would invalidate all state caches and may incur high >>> latency to reload the state cache for previous batch. But I guess we could >>> adopt it if we feel bigger merit for reconfiguring partitions of stateful >>> operators against reloading state. >>> >>> > Rejected alternatives >>> >>> * Offline physical repartitioning of state data (loading all state and >>> recalculate new partition id per key and resave) >>> >>> I thought about it but discarded since it should take non-trivial time >>> to repartition once state is going to be huge. Also it is not easy to >>> implement the tool running efficiently. (whole state may not be fit in >>> memory, have to handle them concurrently, etc.) >>> >>> Please share your opinion about this proposal: opinion regarding accept >>> or decline, things to correct in my mail, any suggestions for improvement, >>> etc. >>> >>> Please also let me know if it would be better to move this to google doc >>> or pdf with filing JIRA issue. >>> >>> Thanks, >>> Jungtaek Lim (HeartSaVioR) >>> >>> 1. https://github.com/apache/spark/pull/21718 >>> >> >>