Thanks Gabor and Alan for the discussion and good points, and sorry for the late reply.
On "how does a connector pick the right token": the selection key is the task's own JobID. The consuming operator already has access to it: sinks via Sink v2 InitContext.getJobInfo(), and sources via FLIP-583 [1] (which exposes JobInfo on SourceReaderContext / SplitEnumeratorContext). I submitted FLIP-583 partly as a precondition for this one(though not only for that purpose). So a connector can tell "which job am I" on both sides. And there are three pieces end-to-end: 1. The task knows its JobID -> it is covered (InitContext for sinks, FLIP-583 for sources). 2. A cache on the TM side keyed by JobID, holding the per-job token, that the task looks up. 3. The connector applies that token at its authentication point. Regarding the example: I'd suggest connector-kafka if you agree. Kafka builds a client per task with its own SASL/OAUTHBEARER callback that can apply the per-job token by JobID. It's also the first connector I plan to onboard once the discussion finalizes, I've tested it internally and it works well. Could you please clarify, do you want to see more details/examples(with connectors/e2e?) in the FLIP document? I'll reply to the rest of the comments soon. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts Kind regards, Aleksandr On Wed, 24 Jun 2026 at 13:02, Gabor Somogyi <[email protected]> wrote: > > Hi Alan, > > First of all thanks for the detailed explanation, it really helped to be in > sync! > As a general saying without too much specifics it would be good the solve > this > with the least invasive and most stable solution no matter what this means. > I think the starting idea is good, it would be awesome to see how this > end-to-end > could work and fill the gaps if there are. > > I've just refreshed my memories and created a small design > sketch (I can share if you're interested but at this point maybe not > relevant). > It's important to discuss about the API but during the design a more > fundamental > question popped into my mind. > > Let's say the receiver side there are JobID->credentials pairs. How any > connector in a task can > decide which one to choose when writing a specific file? We can take Hadoop > S3 connector > as an example to talk about concrete solutions. > > The reason why I'm asking is because at the end of the day we must do the > migration and it > would be good to have a reference connector. > > BR, > G > > > On Wed, Jun 24, 2026 at 12:04 AM Alan Sheinberg via dev < > [email protected]> wrote: > > > Hi Gabor, > > > > Thanks for the thoughtful comments. I just wanted to chime in on some of > > the thinking Aleksandr and I have had. > > > > Up until now DelegationTokenProvider instances were singletons and loaded > > > by the > > > service loader. Now we plan to add stop function, does that mean we plan > > > to change > > > the lifecycle? > > > > > > No, the lifecycle is unchanged. It was imagined that this would be a > > useful hook for potentially cleaning things up, if necessary. Sometimes > > thread pools or other resources might need to be shut down neatly. > > > > Having a generic way to ask the delegation token manager to re-obtain is a > > > long standing > > > needed feature but didn't have time. Having a dedicated API for this > > would > > > be maybe > > > better instead of relying on registerJob return value. > > > > > > I agree that a general API for managing re-obtain makes sense. Generally > > the DelegationTokenProvider would likely request a re-obtain in response to > > some event. Currently, obtainDelegationTokens() is the main hook that > > fetches tokens and determines when it will be called again. Another > > possibility could be a background thread that requests it, or the new > > registerJob/unregisterJob methods being proposed. > > > > A quick sketch of a possible generic interface: > > > > public interface DelegationTokenManagerCallback { > > void reobtainDelegationTokens(); > > } > > > > We could then overload the init method of DelegationTokenProvider and > > have init(Configuration > > config, DelegationTokenManagerCallback callback)so that the > > DelegationTokenProvider could keep a reference to the callback and initiate > > a re-obtain at will (causing a new refresh on > > the DefaultDelegationTokenManager's ioExecutor). The callback logic would > > need to be smart about deduping calls so that only one was scheduled at a > > time in a threadsafe way. > > > > This method could then be utilized by the body of any registerJob, allowing > > the method to have a void return value. > > > > That approach is simple and could be extended in the future if you have > > some broader ideas on other parts of the api. Would you rather implement > > this approach and avoid adding a special case to registerJob? > > > > Not sure sure how it's planned but new immediate re-obtain scheduling would > > > be good to be > > > upper bounded. Some retry logic can be aggressive about re-registration. > > > Or having a > > > cooldown is also fine. > > > > > > That makes sense to have some cool down to avoid doing it too often, > > however, a job might not run if it cannot initiate a re-obtain soon after > > being registered. A configurable cooldown with a decent default might be a > > good choice. > > > > Last but not least up until now there was a single thread which played on > > > critical path on > > > immutable structures. Now we plan to change that which is fine but then I > > > would like to see an > > > exact plan what kind of threads are doing what and how do we protect > > > against > > > race/starvation/deadlock. Having an exact look is fine on the PR but this > > > is the gist of it > > > from my perspective. > > > > > > > In the current codebase a single thread creates > > the DefaultDelegationTokenManager and builds the immutable structures. Then > > DefaultDelegationTokenManager.start is called from the ResourceManager main > > thread and each token re-obtain is called on a thread > > from DefaultDelegationTokenManager.ioExecutor. Therefore, fields within a > > DelegationTokenProvider must either be immutable or properly synchronized. > > > > The calls to registerJob/unregisterJob in this FLIP will come from the > > ResourceManager main thread, calling through to > > DefaultDelegationTokenManager and then the providers. They are assumed to > > be non blocking and just handle book-keeping for the next re-obtain call. > > Since this pattern inherently requires updating internal fields, the > > DelegationTokenProvider must properly synchronize the methods/fields used > > for this book-keeping. Calls to registerJob/unregisterJob aren't prevented > > from blocking and starving others, similar to obtainDelegationTokens. The > > contract can be made very clear in the javadoc. Preventing races, > > starvation, or deadlock within the provider will therefore depend on proper > > implementation by the user. > > > > A larger reworking of DefaultDelegationTokenManager could try to do > > everything on a single thread > > (registerJob/unregisterJob/obtainDelegationTokens) to simplify this model, > > but would require using a special background thread rather than the > > ioExecutor. I haven't considered this in detail, but would be open to it > > if it were strongly preferred. > > > > What I mean here specifically is that even if we schedule the renewal the > > > existing way > > > at least the providers list manipulation and the originally scheduled > > > renewal can race. > > > Maybe others since I can just imagine the change. > > > > > > I don't think we intend on changing the list of providers -- these are > > still immutable. Whenever a new re-obtain is requested, it should cancel > > the originally scheduled renewal using the future as in > > DefaultDelegationTokenManager.stopTokensUpdate, ensuring just one update > > scheduled at a time. > > > > I hope I have answered a lot of your questions. I'm happy to elaborate or > > even show a draft PR if that might be easier to trace. > > > > Thanks, > > Alan > > > > On Fri, Jun 19, 2026 at 7:50 AM Gabor Somogyi <[email protected]> > > wrote: > > > > > Hi Aleksandr, > > > > > > Thanks for efforts! > > > > > > I've missed this thread lately but have some thought/questions. > > > > > > Up until now one cluster per one set of user credentials was the model. I > > > think the multi-user > > > model better serves the needs so +1. We should mention this on the main > > > doc page later. > > > > > > Up until now DelegationTokenProvider instances were singletons and loaded > > > by the > > > service loader. Now we plan to add stop function, does that mean we plan > > > to change > > > the lifecycle? > > > > > > Having a generic way to ask the delegation token manager to re-obtain is > > a > > > long standing > > > needed feature but didn't have time. Having a dedicated API for this > > would > > > be maybe > > > better instead of relying on registerJob return value. > > > > > > Not sure sure how it's planned but new immediate re-obtain scheduling > > > would be good to be > > > upper bounded. Some retry logic can be aggressive about re-registration. > > > Or having a > > > cooldown is also fine. > > > > > > Last but not least up until now there was a single thread which played on > > > critical path on > > > immutable structures. Now we plan to change that which is fine but then I > > > would like to see an > > > exact plan what kind of threads are doing what and how do we protect > > > against > > > race/starvation/deadlock. Having an exact look is fine on the PR but this > > > is the gist of it > > > from my perspective. > > > What I mean here specifically is that even if we schedule the renewal the > > > existing way > > > at least the providers list manipulation and the originally scheduled > > > renewal can race. > > > Maybe others since I can just imagine the change. > > > > > > BR, > > > G > > > > > > > > > On 2026/06/05 16:35:15 Aleksandr Savonin wrote: > > > > Hi everyone, > > > > > > > > Alan Sheinberg and I would like to start a discussion on FLIP-588: > > > > Support per-job delegation tokens [1]. > > > > Flink's delegation token framework is currently cluster-scoped, which > > > > means a DelegationTokenProvider has no notion of an individual job. > > > > This breaks when different jobs on the same cluster need to > > > > authenticate as different identities to the same external service. > > > > To resolve this, the FLIP adds per-job lifecycle hooks > > > > (registerJob/unregisterJob/stop) as default methods on the > > > > DelegationTokenProvider SPI, along with the runtime wiring to invoke > > > > them on job start and stop. > > > > This change is fully backward compatible (new methods are default > > > > no-ops). It is worth mentioning that it widens the internal > > > > registerJobMaster RPC to carry the job configuration. > > > > > > > > Looking forward to your feedback. > > > > > > > > [1] > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-588*3A*Support*per-job*delegation*tokens__;JSsrKys!!Ayb5sqE7!pujTCGQDxHRMUp32hJP7kWS_heNDLb_73xOFQWmfwladcejJ1XJF028lAWmhEubAIfREamAXhXe0ImcLzn1TBQ9SvZl-ww$ > > > > > > > > -- > > > > Kind regards, > > > > Aleksandr > > > > > > > > > -- Kind regards, Aleksandr
