When you change the parallelism then keys are re-distributed across operators instances.

/However/, this re-distribution is limited to the set /maxParallelism /(set via the ExecutionConfig), which by default is 128 if no operators exceeded the parallelism on the first submission.
This *cannot be changed* after the job was run without discarding state.

See https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism

On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key will be forwarded to the same operator instance(i), and we do not need to guarantee that 'i' is the same with the 'i' in previous savepoints. When the job is restarted, the rule 'same key's record will be in together' is guaranteed and more slots will be surely useful, since each slot(operator instance) will be responsible for less keys, leading to less records.

Tripathi,Vikash <vikash.tripa...@cerner.com <mailto:vikash.tripa...@cerner.com>> 于2021年2月18日周四 上午12:09写道:

    Hi there,

    I wanted to know how re-partitioning of keys per operator instance
    would happen when the current operator instances are scaled up or
    down and we are restarting our job from a previous savepoint which
    had a different number of parallel instances of the same operator.

    My main concern is whether the re-distribution would lead to
    mapping of same keys to same operator instances as was done
    earlier but if this happens then there would be no added advantage
    of adding new task slots for the same operator because they would
    remain less used or not used at all if all possible key values
    have been seen earlier and if we go by the other way around of
    evenly distributing out keys (based on the hash function) to the
    new parallel slots as well, won't this cause issues in terms of
    processing consistent results based on the state of operator as
    was provided by previous savepoint of application.

    Is there a guarantee given by the hash function as in attached
    snippet, that same keys which landed earlier on an operator
    instance will land back again to the same operator instance once
    the job is restarted with new set of parallelism configuration?

    Thanks,

    Vikash

    CONFIDENTIALITY NOTICE This message and any included attachments
    are from Cerner Corporation and are intended only for the
    addressee. The information contained in this message is
    confidential and may constitute inside or non-public information
    under international, federal, or state securities laws.
    Unauthorized forwarding, printing, copying, distribution, or use
    of such information is strictly prohibited and may be unlawful. If
    you are not the addressee, please promptly delete this message and
    notify the sender of the delivery error by e-mail or you may call
    Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
    (816)221-1024.


Reply via email to