Thanks Gabor and Alan for the discussion and good points, and sorry
for the late reply.

On "how does a connector pick the right token": the selection key is
the task's own JobID. The consuming operator already has access to it:
sinks via Sink v2 InitContext.getJobInfo(), and sources via FLIP-583
[1] (which exposes JobInfo on SourceReaderContext /
SplitEnumeratorContext). I submitted FLIP-583 partly as a precondition
for this one(though not only for that purpose).

So a connector can tell "which job am I" on both sides. And there are
three pieces end-to-end:
1. The task knows its JobID -> it is covered (InitContext for sinks,
FLIP-583 for sources).
2. A cache on the TM side keyed by JobID, holding the per-job token,
that the task looks up.
3. The connector applies that token at its authentication point.

Regarding the example: I'd suggest connector-kafka if you agree. Kafka
builds a client per task with its own SASL/OAUTHBEARER callback that
can apply the per-job token by JobID. It's also the first connector I
plan to onboard once the discussion finalizes, I've tested it
internally and it works well. Could you please clarify, do you want to
see more details/examples(with connectors/e2e?) in the FLIP document?

I'll reply to the rest of the comments soon.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts

Kind regards,
Aleksandr

On Wed, 24 Jun 2026 at 13:02, Gabor Somogyi <[email protected]> wrote:
>
> Hi Alan,
>
> First of all thanks for the detailed explanation, it really helped to be in
> sync!
> As a general saying without too much specifics it would be good the solve
> this
> with the least invasive and most stable solution no matter what this means.
> I think the starting idea is good, it would be awesome to see how this
> end-to-end
> could work and fill the gaps if there are.
>
> I've just refreshed my memories and created a small design
> sketch (I can share if you're interested but at this point maybe not
> relevant).
> It's important to discuss about the API but during the design a more
> fundamental
> question popped into my mind.
>
> Let's say the receiver side there are JobID->credentials pairs. How any
> connector in a task can
> decide which one to choose when writing a specific file? We can take Hadoop
> S3 connector
> as an example to talk about concrete solutions.
>
> The reason why I'm asking is because at the end of the day we must do the
> migration and it
> would be good to have a reference connector.
>
> BR,
> G
>
>
> On Wed, Jun 24, 2026 at 12:04 AM Alan Sheinberg via dev <
> [email protected]> wrote:
>
> > Hi Gabor,
> >
> > Thanks for the thoughtful comments. I just wanted to chime in on some of
> > the thinking Aleksandr and I have had.
> >
> >  Up until now DelegationTokenProvider instances were singletons and loaded
> > > by the
> > > service loader. Now we plan to add stop function, does that mean we plan
> > > to change
> > > the lifecycle?
> >
> >
> > No, the lifecycle is unchanged.  It was imagined that this would be a
> > useful hook for potentially cleaning things up, if necessary.  Sometimes
> > thread pools or other resources might need to be shut down neatly.
> >
> > Having a generic way to ask the delegation token manager to re-obtain is a
> > > long standing
> > > needed feature but didn't have time. Having a dedicated API for this
> > would
> > > be maybe
> > > better instead of relying on registerJob return value.
> >
> >
> > I agree that a general API for managing re-obtain makes sense.  Generally
> > the DelegationTokenProvider would likely request a re-obtain in response to
> > some event.  Currently, obtainDelegationTokens() is the main hook that
> > fetches tokens and determines when it will be called again.  Another
> > possibility could be a background thread that requests it, or the new
> > registerJob/unregisterJob methods being proposed.
> >
> > A quick sketch of a possible generic interface:
> >
> > public interface DelegationTokenManagerCallback {
> >    void reobtainDelegationTokens();
> >  }
> >
> > We could then overload the init method of DelegationTokenProvider and
> > have init(Configuration
> > config, DelegationTokenManagerCallback callback)so that the
> > DelegationTokenProvider could keep a reference to the callback and initiate
> > a re-obtain at will (causing a new refresh on
> > the DefaultDelegationTokenManager's ioExecutor).  The callback logic would
> > need to be smart about deduping calls so that only one was scheduled at a
> > time in a threadsafe way.
> >
> > This method could then be utilized by the body of any registerJob, allowing
> > the method to have a void return value.
> >
> > That approach is simple and could be extended in the future if you have
> > some broader ideas on other parts of the api.  Would you rather implement
> > this approach and avoid adding a special case to registerJob?
> >
> > Not sure sure how it's planned but new immediate re-obtain scheduling would
> > > be good to be
> > > upper bounded. Some retry logic can be aggressive about re-registration.
> > > Or having a
> > > cooldown is also fine.
> >
> >
> > That makes sense to have some cool down to avoid doing it too often,
> > however, a job might not run if it cannot initiate a re-obtain soon after
> > being registered.  A configurable cooldown with a decent default might be a
> > good choice.
> >
> > Last but not least up until now there was a single thread which played on
> > > critical path on
> > > immutable structures. Now we plan to change that which is fine but then I
> > > would like to see an
> > > exact plan what kind of threads are doing what and how do we protect
> > > against
> > > race/starvation/deadlock. Having an exact look is fine on the PR but this
> > > is the gist of it
> > > from my perspective.
> > >
> >
> > In the current codebase a single thread creates
> > the DefaultDelegationTokenManager and builds the immutable structures. Then
> > DefaultDelegationTokenManager.start is called from the ResourceManager main
> > thread and each token re-obtain is called on a thread
> > from DefaultDelegationTokenManager.ioExecutor.  Therefore, fields within a
> > DelegationTokenProvider must either be immutable or properly synchronized.
> >
> > The calls to registerJob/unregisterJob in this FLIP will come from the
> > ResourceManager main thread, calling through to
> > DefaultDelegationTokenManager and then the providers.  They are assumed to
> > be non blocking and just handle book-keeping for the next re-obtain call.
> > Since this pattern inherently requires updating internal fields, the
> > DelegationTokenProvider must properly synchronize the methods/fields used
> > for this book-keeping.  Calls to registerJob/unregisterJob aren't prevented
> > from blocking and starving others, similar to obtainDelegationTokens.  The
> > contract can be made very clear in the javadoc.  Preventing races,
> > starvation, or deadlock within the provider will therefore depend on proper
> > implementation by the user.
> >
> > A larger reworking of DefaultDelegationTokenManager could try to do
> > everything on a single thread
> > (registerJob/unregisterJob/obtainDelegationTokens) to simplify this model,
> > but would require using a special background thread rather than the
> > ioExecutor.  I haven't considered this in detail, but would be open to it
> > if it were strongly preferred.
> >
> > What I mean here specifically is that even if we schedule the renewal the
> > > existing way
> > > at least the providers list manipulation and the originally scheduled
> > > renewal can race.
> > > Maybe others since I can just imagine the change.
> >
> >
> > I don't think we intend on changing the list of providers -- these are
> > still immutable.   Whenever a new re-obtain is requested, it should cancel
> > the originally scheduled renewal using the future as in
> > DefaultDelegationTokenManager.stopTokensUpdate, ensuring just one update
> > scheduled at a time.
> >
> > I hope I have answered a lot of your questions.  I'm happy to elaborate or
> > even show a draft PR if that might be easier to trace.
> >
> > Thanks,
> > Alan
> >
> > On Fri, Jun 19, 2026 at 7:50 AM Gabor Somogyi <[email protected]>
> > wrote:
> >
> > > Hi Aleksandr,
> > >
> > > Thanks for efforts!
> > >
> > > I've missed this thread lately but have some thought/questions.
> > >
> > > Up until now one cluster per one set of user credentials was the model. I
> > > think the multi-user
> > > model better serves the needs so +1. We should mention this on the main
> > > doc page later.
> > >
> > > Up until now DelegationTokenProvider instances were singletons and loaded
> > > by the
> > > service loader. Now we plan to add stop function, does that mean we plan
> > > to change
> > > the lifecycle?
> > >
> > > Having a generic way to ask the delegation token manager to re-obtain is
> > a
> > > long standing
> > > needed feature but didn't have time. Having a dedicated API for this
> > would
> > > be maybe
> > > better instead of relying on registerJob return value.
> > >
> > > Not sure sure how it's planned but new immediate re-obtain scheduling
> > > would be good to be
> > > upper bounded. Some retry logic can be aggressive about re-registration.
> > > Or having a
> > > cooldown is also fine.
> > >
> > > Last but not least up until now there was a single thread which played on
> > > critical path on
> > > immutable structures. Now we plan to change that which is fine but then I
> > > would like to see an
> > > exact plan what kind of threads are doing what and how do we protect
> > > against
> > > race/starvation/deadlock. Having an exact look is fine on the PR but this
> > > is the gist of it
> > > from my perspective.
> > > What I mean here specifically is that even if we schedule the renewal the
> > > existing way
> > > at least the providers list manipulation and the originally scheduled
> > > renewal can race.
> > > Maybe others since I can just imagine the change.
> > >
> > > BR,
> > > G
> > >
> > >
> > > On 2026/06/05 16:35:15 Aleksandr Savonin wrote:
> > > > Hi everyone,
> > > >
> > > > Alan Sheinberg and I would like to start a discussion on FLIP-588:
> > > > Support per-job delegation tokens [1].
> > > > Flink's delegation token framework is currently cluster-scoped, which
> > > > means a DelegationTokenProvider has no notion of an individual job.
> > > > This breaks when different jobs on the same cluster need to
> > > > authenticate as different identities to the same external service.
> > > > To resolve this, the FLIP adds per-job lifecycle hooks
> > > > (registerJob/unregisterJob/stop) as default methods on the
> > > > DelegationTokenProvider SPI, along with the runtime wiring to invoke
> > > > them on job start and stop.
> > > > This change is fully backward compatible (new methods are default
> > > > no-ops). It is worth mentioning that it widens the internal
> > > > registerJobMaster RPC to carry the job configuration.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]
> > >
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-588*3A*Support*per-job*delegation*tokens__;JSsrKys!!Ayb5sqE7!pujTCGQDxHRMUp32hJP7kWS_heNDLb_73xOFQWmfwladcejJ1XJF028lAWmhEubAIfREamAXhXe0ImcLzn1TBQ9SvZl-ww$
> > > >
> > > > --
> > > > Kind regards,
> > > > Aleksandr
> > > >
> > >
> >



-- 
Kind regards,
Aleksandr

Reply via email to