Scheduling multiple partitions in the same task is basically what
coalesce() does. Is there a reason that doesn't work here?

On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim <kabh...@gmail.com> wrote:

> Here's a link for Google docs (anyone can comment):
> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_
> 3pXEsyq4LGpyNs/edit?usp=sharing
>
> Please note that I just copied the content to the google docs, so someone
> could point out lack of details. I would like to start with explanation of
> the concept, and once we are in agreement on going forward, I could add
> more detail in doc, or even just start working and detail can be shared
> with POC code or even WIP patch.
>
> Answer inlined for Arun's comments:
>
> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan <ar...@apache.org>님이 작성:
>
>> Can you share this in a google doc to make the discussions easier.?
>>
>>
>>
>> Thanks for coming up with ideas to improve upon the current restrictions
>> with the SS state store.
>>
>>
>>
>> If I understood correctly, the plan is to introduce a logical
>> partitioning scheme for state storage (based on keys) independent of
>> spark’s partitioning so that the number of spark partitions can be varied.
>>
>>
>>
>> my 2 cents,
>>
>>
>>
>>    1. The Partitioning is already a kind of a logical entity in Spark.
>>    Maybe this can be leveraged to over-partition in advance (similar to
>>    setting the number of state key groups in your proposal) but make it easy
>>    to run more than one task (partition) per core (I am not sure how easy 
>> this
>>    is currently). Then we can continue to leverage the existing state
>>    implementation. This has similar limitations like what you pointed out 
>> (the
>>    max number of partitions has to be fixed upfront). But once the over
>>    provisioning of partitions is made easy it could be leveraged even for
>>    non-stateful operations.
>>
>>
> If we could allow assigning multiple partitions in a task (say,
> parallelism or maximum concurrency), maybe we could achieve it a bit
> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
> we could do it. In addition, partitions for downstream operators will be
> affected unless we don't shuffle afterwards.
>
>
>>    1. Decouple the state from partition completely associate it only
>>    with the keys. This would be the most flexible option and we can scale the
>>    partitions up/down as we wish. This needs a scalable distributed state
>>    store implementation supporting fast look ups /storage by key.
>>
>>
> It can be achievable with couple of external storages like Redis or HBase
> or so, but I would avoid the step which requires end users to maintain
> other system as well. Spark is coupled with specific version of Hadoop, so
> we could expect that end users could run and maintain HDFS.
>
>
>> Thanks,
>>
>> Arun
>>
>>
>> On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I have a new feature to propose and hear opinions on community. Not sure
>>> it is such a big change to worth to step on SPIP, so posting to dev mailing
>>> list  instead.
>>>
>>> > Feature
>>>
>>> Reconfigurable number of partitions on state operators in Structured
>>> Streaming
>>>
>>> > Rationalization
>>>
>>> Nowadays, state in structured streaming is stored individually via
>>> partition given such configuration "spark.sql.shuffle.partitions" and
>>> cannot modify the configuration after the query is run once. One
>>> contributor already submitted a patch [1] without knowing why such
>>> restriction came into play.
>>>
>>> Such restriction for state is necessary because state is distributed by
>>> hash function applied to key columns, but as a side-effect of restriction,
>>> we can't change partitions of stateful operators. End users would have
>>> various workloads and also various SLA (and SLA can be changed), so
>>> restricting to specific count of partitions would not satisfy their needs.
>>> Moreover, end users are not easy to indicate the configuration before they
>>> run query, and realize they can't modify it when they try to modify it.
>>>
>>> > Proposal
>>>
>>> The feature proposes decoupling data partitions and operator partitions
>>> via introducing key groups to state, enabling scalability of operator
>>> partitions while state data partitions remain same (so no issue on state
>>> data). This approach is inspired by how Flink supports scalability with
>>> partitioned state.
>>>
>>> The concept itself is simple, while we apply such partitioning
>>> expression to the key columns (simplified):
>>>
>>> hash(key columns) % number of state operator partitions
>>>
>>> it will apply below partitioning expression so that it can be
>>> distributed via state data partitions but each state operator partition
>>> could handle multiple state data partitions.
>>>
>>> (hash(key columns) % number of state key groups) % number of state
>>> operator partitions
>>>
>>> The state data will not still be scalable actually, so the number of
>>> state key groups will be a new hard limit (we should restrict modifying it
>>> once query is run). But we can change the number of stateful operator
>>> partitions afterwards. The number of stateful operator partitions should be
>>> equal or smaller than the number of state key groups. (It doesn't make
>>> sense for partitions to be not assigned any state key group and idle.)
>>>
>>> > Possible Risk
>>>
>>> Performance might be affected, because either one should be performed:
>>>
>>> 1. each partition should calculate key group id per key
>>> 2. key group id should be calculated and inserted to the row before
>>> passing state operators (shuffle), and removed after passing state operators
>>>
>>> There's other performance concern like committing multiple states in a
>>> partition when number of operator partitions < number of state key groups,
>>> but we could run it concurrently (at least for HDFS state store), and
>>> actually it is also an issue for nowadays (all tasks may not be launched
>>> together).
>>>
>>> Code complexity would be introduced as expected.
>>>
>>> > Limitation
>>>
>>> For the first time, it will not support dynamic reconfiguration like
>>> changing the value during query is running. Actually it can be achieved
>>> simply via unloading all the state providers in executors before running
>>> next batch, but it would invalidate all state caches and may incur high
>>> latency to reload the state cache for previous batch. But I guess we could
>>> adopt it if we feel bigger merit for reconfiguring partitions of stateful
>>> operators against reloading state.
>>>
>>> > Rejected alternatives
>>>
>>> * Offline physical repartitioning of state data (loading all state and
>>> recalculate new partition id per key and resave)
>>>
>>> I thought about it but discarded since it should take non-trivial time
>>> to repartition once state is going to be huge. Also it is not easy to
>>> implement the tool running efficiently. (whole state may not be fit in
>>> memory, have to handle them concurrently, etc.)
>>>
>>> Please share your opinion about this proposal: opinion regarding accept
>>> or decline, things to correct in my mail, any suggestions for improvement,
>>> etc.
>>>
>>> Please also let me know if it would be better to move this to google doc
>>> or pdf with filing JIRA issue.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 1. https://github.com/apache/spark/pull/21718
>>>
>>
>>

Reply via email to