Thanks Nicholas!!
I will start a vote thread on Monday.

Thanks,
Aravind

Aravind K. Patnam


On Fri, May 16, 2025 at 12:19 AM Nicholas Jiang <[email protected]>
wrote:

> Hi Aravind,
>
> Thanks for answer above questions. I have no other comments for this
> proposal. Looking forward to introduce disruption aware slot selection.
>
> Regards,
> Nicholas Jiang
>
> On 2025/05/15 07:12:42 Aravind Patnam wrote:
> > Hi Nicholas,
> >
> > For us, we have a high interruption rate (each worker host goes down
> twice
> > a week at least for maintenance), so our task failure rate was also
> > high, especially given our scale. This was especially the case for Flink
> > where replication is not available, and would ultimately cause the job to
> > fail after cascading task failures. For Spark, it is a lesser concern
> given
> > that we replicate, but still results in a task failure when a connection
> is
> > failed with a worker that is not available anymore (of course, for Spark
> > the replica is available so it would not cause app failure).
> >
> > There would be no extra cost for scenarios in which this feature is
> > disabled. The normal path would remain the same.
> >
> > Regarding configuration changes, this feature would just be
> > enabled/disabled via a new boolean flag (
> > *celeborn.master.slot.assign.interruptionAware*). If this feature is
> > enabled, the percentile threshold that I talked about in my previous
> email
> > would also be a config (
> > *celeborn.master.slot.assign.interruptionAware.threshold*), with a
> default
> > of 50%.
> >
> > Thanks,
> > Aravind
> >
> > On Wed, May 14, 2025 at 7:29 PM Nicholas Jiang <[email protected]
> >
> > wrote:
> >
> > > Hi Aravind,
> > >
> > > Thanks for your detailed explanation.
> > >
> > > The mechanism of disruption aware slot selection makes sense to me. But
> > > I'm curious what the failure rate of tasks before you came up with this
> > > solution for substantial decrease (20x) in task failures. The task
> failure
> > > rate in our production environment is already very low, especially for
> > > Flink batch jobs without replica support. Meanwhile, is there any
> > > configuration changes for disruption aware slot selection? I did not
> see
> > > any configuration changes in this proposal.
> > >
> > > Regards,
> > > Nicholas Jiang
> > >
> > > On 2025/05/13 21:46:46 Aravind Patnam wrote:
> > > > Hi Nicholas,
> > > >
> > > > Thanks for the feedback and questions! Let me answer your questions
> > > below.
> > > >
> > > > > 1. Why is the lack of Flink replication support the motivation for
> > > > disruption aware slot selection? Do you mean that disruption aware
> slot
> > > > selection helps to reduce recompution costs for Flink without
> replication
> > > > support?
> > > > Yes, this would significantly reduce recomputation costs. This is
> > > > especially in an environment like ours internally, where we
> experience
> > > > constant interruptions throughout the day to a large set of workers.
> By
> > > > proactively only using workers that won't get interrupted, we can
> > > minimize
> > > > this recomputation cost and reduce the task failures.
> > > >
> > > > > 2. Can you provide a complete definition of the
> > > /updateInterruptionNotice
> > > > interface? Meanwhile, could you also provide the definition of
> > > > corresponding CLI interface?
> > > > Yes, sure - I have updated the CIP google doc to now include the
> > > interface
> > > > <
> > >
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?pli=1&tab=t.0#heading=h.96cl46gz9o8l
> > > >
> > > > now. In summary, it would be a PUT api, which would take in a List of
> > > > *UpdateInterruptionNoticeRequests.
> > > > *This object's fields would simply be the worker hostname along with
> the
> > > > next known interruption timestamp in milliseconds. Every time this
> API is
> > > > called, it will reset the workers not included in the request back
> to no
> > > > nextKnownDisruption automatically.
> > > >
> > > > > 3. How is the performance of disruption aware slot selection? Which
> > > > scenario could users use disruption aware slot selection?
> > > > Internally, we have enabled this feature and we see great
> improvements to
> > > > our task failures. You can see the results here
> > > > <
> > >
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?pli=1&tab=t.0#bookmark=id.asvbrvv0c9q6
> > > >
> > > > --
> > > > we roughly see *20x decrease for Spark task failures related to
> > > > interruptions*. This is because we have a high rate of interruptions
> > > > internally. Other users with similar environments would greatly
> benefit
> > > > from this feature, as they can enable this to prioritize workers that
> > > will
> > > > not get interrupted.
> > > >
> > > > > 4. How could the cluster administrator determine
> > > > workersWithLateInterruptions and workersWithEarlyInterruptions? BTW,
> how
> > > > does the administrator evaluate the threshold percentile based on the
> > > range
> > > > of interruption timestamps?
> > > > This sort of depends on how the interruptions are scheduled and how
> much
> > > > advance notice is present, along with the application
> characteristics in
> > > > the cluster. We plan on distinguishing the workers as follows in the
> > > code:
> > > >
> > > >
> > > >    -
> > > >
> > > >    prioritizedWorkers = workersWithNoInterruptions +
> > > >    workersWithLateInterruptions
> > > >    -
> > > >
> > > >    deprioritizedWorkers = workersWithEarlyInterruptions
> > > >
> > > >
> > > > The calculation is:
> > > >
> > > >    - workersWithLateInterruptions = percentageThreshold *
> > > >    sorted(totalWorkers)
> > > >    - workersWithEarlyInterruptions = totalWorkers -
> > > >    workersWithLateInterruptions
> > > >
> > > >
> > > > This means that the higher the percentage threshold, the higher the
> size
> > > of
> > > > workersWithLateInterruptions, which means the higher the number of
> > > > prioritizedWorkers.
> > > >
> > > > For example, if you have a larger advance notice (such as 1 week or
> so),
> > > it
> > > > makes sense to set this percentage threshold higher (e.g. 90%). If
> you
> > > have
> > > > a smaller advanced notice window, it makes sense to set it something
> > > lower
> > > > (e.g. 50%).
> > > >
> > > > Consider this example where there are 10 workers with interruption
> > > > timestamps t5-t14, and current time t0:
> > > >
> > > > w1
> > > >
> > > > w2
> > > >
> > > > w3
> > > >
> > > > w4
> > > >
> > > > w5
> > > >
> > > > w6
> > > >
> > > > w7
> > > >
> > > > w8
> > > >
> > > > w9
> > > >
> > > > w10
> > > >
> > > > t5
> > > >
> > > > t6
> > > >
> > > > t7
> > > >
> > > > t8
> > > >
> > > > t9
> > > >
> > > > t10
> > > >
> > > > t11
> > > >
> > > > t12
> > > >
> > > > t13
> > > >
> > > > t14
> > > >
> > > > If we set percentageThreshold to 50%, then the following would occur:
> > > >
> > > >    - w1 - w5 would be considered workersWithEarlyInterruptions
> > > >    - w6 - w10 would be considered workersWithLateInterruptions
> > > >
> > > > Given current time is only t0 and is far away from t5, this means we
> are
> > > > probably wasting workers w1 - w5 since they have a long time to be
> > > > disrupted anyways. Hence, in this case it would make more sense to
> set a
> > > > higher percentage threshold such as 90%. Then it would be like this:
> > > >
> > > >    - w1 would be considered workersWithEarlyInterruptions
> > > >    - w2 - w10 would be considered workersWithLateInterruptions
> > > >
> > > > This is better for the cluster given a larger advanced notice.
> > > >
> > > > However, if there is a smaller advanced notice, consider this
> example:
> > > >
> > > > w1
> > > >
> > > > w2
> > > >
> > > > w3
> > > >
> > > > w4
> > > >
> > > > w5
> > > >
> > > > w6
> > > >
> > > > w7
> > > >
> > > > w8
> > > >
> > > > w9
> > > >
> > > > w10
> > > >
> > > > t1
> > > >
> > > > t2
> > > >
> > > > t3
> > > >
> > > > t4
> > > >
> > > > t5
> > > >
> > > > t6
> > > >
> > > > t7
> > > >
> > > > t8
> > > >
> > > > t9
> > > >
> > > > t10
> > > >
> > > > If we set percentageThreshold to 90%, then the following would occur:
> > > >
> > > >    - w1 would be considered workersWithEarlyInterruptions
> > > >    - w2 - w10 would be considered workersWithLateInterruptions
> > > >
> > > > Given current time is only t0 and is close to t1 or t2, medium/long
> > > running
> > > > jobs would probably incur failures when w2-w10 go down. Hence in this
> > > case,
> > > > it might be better to set to a lower percentage, such as 50%. Then it
> > > would
> > > > be like this:
> > > >
> > > >    - w1-5 would be considered workersWithEarlyInterruptions
> > > >    - w6 - w10 would be considered workersWithLateInterruptions
> > > >
> > > > This is better for the cluster given a smaller advanced notice.
> > > >
> > > > Overall, the cluster administrator should take into account the
> > > > interruption advanced notice time, along with the average/median job
> > > > runtime in the cluster before setting this value.
> > > > We can add more documentation on how to set this value once we start
> > > > implementing it.
> > > >
> > > > Hope this answers all your questions!
> > > >
> > > > Thanks,
> > > > Aravind
> > > >
> > > >
> > > > On Tue, May 13, 2025 at 9:40 AM Nicholas Jiang <
> [email protected]
> > > >
> > > > wrote:
> > > >
> > > > > Hi Aravind,
> > > > >
> > > > > Thanks for driving the proposal of interruption aware slot
> selection. I
> > > > > have some comments for this proposal:
> > > > >
> > > > > 1. Why is the lack of Flink replication support the motivation for
> > > > > disruption aware slot selection? Do you mean that disruption aware
> slot
> > > > > selection helps to reduce recompution costs for Flink without
> > > replication
> > > > > support?
> > > > >
> > > > > 2. Can you provide a complete definition of the
> > > /updateInterruptionNotice
> > > > > interface? Meanwhile, could you also provide the definition of
> > > > > corresponding CLI interface?
> > > > >
> > > > > 3. How is the performance of disruption aware slot selection? Which
> > > > > scenario could users use disruption aware slot selection?
> > > > >
> > > > > 4. How could the cluster administrator determine
> > > > > workersWithLateInterruptions and workersWithEarlyInterruptions?
> BTW,
> > > how
> > > > > does the administrator evaluate the threshold percentile based on
> the
> > > range
> > > > > of interruption timestamps?
> > > > >
> > > > > Regards,
> > > > > Nicholas Jiang
> > > > >
> > > > > On 2025/05/02 07:40:15 Aravind Patnam wrote:
> > > > > > Hi Celeborn community,
> > > > > >
> > > > > > I have written up CIP-17: Interruption Aware Slot Selection
> > > > > > <
> > > > >
> > >
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?usp=sharing
> > > > > >.
> > > > > > Please review and let me know if there are any comments or
> questions.
> > > > > >
> > > > > > This is a feature we have introduced internally, given our heavy
> > > volume
> > > > > of
> > > > > > interruptions. We have seen substantial decrease in task
> failures in
> > > both
> > > > > > Flink and Spark jobs, and think the community would also benefit
> from
> > > > > this
> > > > > > :)
> > > > > >
> > > > > > Looking forward to getting feedback from the community.
> > > > > >
> > > > > > Thanks,
> > > > > > Aravind
> > > > > >
> > > > > >  CIP 17: Interruption Aware Slot Selection
> > > > > > <
> > > > >
> > >
> https://drive.google.com/open?id=16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Aravind K. Patnam
> > > >
> > >
> >
> >
> > --
> > Aravind K. Patnam
> >
>

Reply via email to