Hi Aleksandr,

FLIP-583 makes sense. That gap filler I was looking for.

Making the Kafka connector as reference is fine, the point is to have at
least one.
My ask would be to add in this FLIP how Kafka as reference would look like.
I'm going to take a look at it from that perspective how can we migrate
other connectors like S3.
Marking the key points are enough, like rough token structure in
provider/receiver,
how a connector stores tokens (static var or similar) and how task picks an
item during authentication.
Since source and sink has different context API it worth to mention token
selection from both side.

Just to sum up my actual understanding (please correct me if I'm wrong):
- FLIP-583 makes sinks JobID aware on V2 API (sources are already covered).
This means V2 API is hard requirement to per-job delegation tokens.
- DelegationTokenProvider and DelegationTokenReceiver lifecycle remains
as-is
- We're planning to add some simple cooldown mechanism against aggressive
obtain reschedule
- Race in the DelegationTokenManager is valid concern and intended to be
synchronised properly <- This is key for me to understand in the PR
- Deadlock will be prevented via proper locking + knowing the involved
threads
- Starvation can happen when ioExecutor is under heavy use. This is coming
from original design and intended to be kept as-is.

The only remaining thing what I'm thinking about is to flag re-obtain in a
more generic way and not via registerJob return value.
Let me think about in in the weekend, in the meantime plz update the FLIP,
it's shaping well.

As a general saying happy to help you guys with review migrate S3 connector.

BR,
G


On Thu, Jun 25, 2026 at 7:54 PM Aleksandr Savonin <[email protected]>
wrote:

> Thanks Gabor and Alan for the discussion and good points, and sorry
> for the late reply.
>
> On "how does a connector pick the right token": the selection key is
> the task's own JobID. The consuming operator already has access to it:
> sinks via Sink v2 InitContext.getJobInfo(), and sources via FLIP-583
> [1] (which exposes JobInfo on SourceReaderContext /
> SplitEnumeratorContext). I submitted FLIP-583 partly as a precondition
> for this one(though not only for that purpose).
>
> So a connector can tell "which job am I" on both sides. And there are
> three pieces end-to-end:
> 1. The task knows its JobID -> it is covered (InitContext for sinks,
> FLIP-583 for sources).
> 2. A cache on the TM side keyed by JobID, holding the per-job token,
> that the task looks up.
> 3. The connector applies that token at its authentication point.
>
> Regarding the example: I'd suggest connector-kafka if you agree. Kafka
> builds a client per task with its own SASL/OAUTHBEARER callback that
> can apply the per-job token by JobID. It's also the first connector I
> plan to onboard once the discussion finalizes, I've tested it
> internally and it works well. Could you please clarify, do you want to
> see more details/examples(with connectors/e2e?) in the FLIP document?
>
> I'll reply to the rest of the comments soon.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts
>
> Kind regards,
> Aleksandr
>
> On Wed, 24 Jun 2026 at 13:02, Gabor Somogyi <[email protected]>
> wrote:
> >
> > Hi Alan,
> >
> > First of all thanks for the detailed explanation, it really helped to be
> in
> > sync!
> > As a general saying without too much specifics it would be good the solve
> > this
> > with the least invasive and most stable solution no matter what this
> means.
> > I think the starting idea is good, it would be awesome to see how this
> > end-to-end
> > could work and fill the gaps if there are.
> >
> > I've just refreshed my memories and created a small design
> > sketch (I can share if you're interested but at this point maybe not
> > relevant).
> > It's important to discuss about the API but during the design a more
> > fundamental
> > question popped into my mind.
> >
> > Let's say the receiver side there are JobID->credentials pairs. How any
> > connector in a task can
> > decide which one to choose when writing a specific file? We can take
> Hadoop
> > S3 connector
> > as an example to talk about concrete solutions.
> >
> > The reason why I'm asking is because at the end of the day we must do the
> > migration and it
> > would be good to have a reference connector.
> >
> > BR,
> > G
> >
> >
> > On Wed, Jun 24, 2026 at 12:04 AM Alan Sheinberg via dev <
> > [email protected]> wrote:
> >
> > > Hi Gabor,
> > >
> > > Thanks for the thoughtful comments. I just wanted to chime in on some
> of
> > > the thinking Aleksandr and I have had.
> > >
> > >  Up until now DelegationTokenProvider instances were singletons and
> loaded
> > > > by the
> > > > service loader. Now we plan to add stop function, does that mean we
> plan
> > > > to change
> > > > the lifecycle?
> > >
> > >
> > > No, the lifecycle is unchanged.  It was imagined that this would be a
> > > useful hook for potentially cleaning things up, if necessary.
> Sometimes
> > > thread pools or other resources might need to be shut down neatly.
> > >
> > > Having a generic way to ask the delegation token manager to re-obtain
> is a
> > > > long standing
> > > > needed feature but didn't have time. Having a dedicated API for this
> > > would
> > > > be maybe
> > > > better instead of relying on registerJob return value.
> > >
> > >
> > > I agree that a general API for managing re-obtain makes sense.
> Generally
> > > the DelegationTokenProvider would likely request a re-obtain in
> response to
> > > some event.  Currently, obtainDelegationTokens() is the main hook that
> > > fetches tokens and determines when it will be called again.  Another
> > > possibility could be a background thread that requests it, or the new
> > > registerJob/unregisterJob methods being proposed.
> > >
> > > A quick sketch of a possible generic interface:
> > >
> > > public interface DelegationTokenManagerCallback {
> > >    void reobtainDelegationTokens();
> > >  }
> > >
> > > We could then overload the init method of DelegationTokenProvider and
> > > have init(Configuration
> > > config, DelegationTokenManagerCallback callback)so that the
> > > DelegationTokenProvider could keep a reference to the callback and
> initiate
> > > a re-obtain at will (causing a new refresh on
> > > the DefaultDelegationTokenManager's ioExecutor).  The callback logic
> would
> > > need to be smart about deduping calls so that only one was scheduled
> at a
> > > time in a threadsafe way.
> > >
> > > This method could then be utilized by the body of any registerJob,
> allowing
> > > the method to have a void return value.
> > >
> > > That approach is simple and could be extended in the future if you have
> > > some broader ideas on other parts of the api.  Would you rather
> implement
> > > this approach and avoid adding a special case to registerJob?
> > >
> > > Not sure sure how it's planned but new immediate re-obtain scheduling
> would
> > > > be good to be
> > > > upper bounded. Some retry logic can be aggressive about
> re-registration.
> > > > Or having a
> > > > cooldown is also fine.
> > >
> > >
> > > That makes sense to have some cool down to avoid doing it too often,
> > > however, a job might not run if it cannot initiate a re-obtain soon
> after
> > > being registered.  A configurable cooldown with a decent default might
> be a
> > > good choice.
> > >
> > > Last but not least up until now there was a single thread which played
> on
> > > > critical path on
> > > > immutable structures. Now we plan to change that which is fine but
> then I
> > > > would like to see an
> > > > exact plan what kind of threads are doing what and how do we protect
> > > > against
> > > > race/starvation/deadlock. Having an exact look is fine on the PR but
> this
> > > > is the gist of it
> > > > from my perspective.
> > > >
> > >
> > > In the current codebase a single thread creates
> > > the DefaultDelegationTokenManager and builds the immutable structures.
> Then
> > > DefaultDelegationTokenManager.start is called from the ResourceManager
> main
> > > thread and each token re-obtain is called on a thread
> > > from DefaultDelegationTokenManager.ioExecutor.  Therefore, fields
> within a
> > > DelegationTokenProvider must either be immutable or properly
> synchronized.
> > >
> > > The calls to registerJob/unregisterJob in this FLIP will come from the
> > > ResourceManager main thread, calling through to
> > > DefaultDelegationTokenManager and then the providers.  They are
> assumed to
> > > be non blocking and just handle book-keeping for the next re-obtain
> call.
> > > Since this pattern inherently requires updating internal fields, the
> > > DelegationTokenProvider must properly synchronize the methods/fields
> used
> > > for this book-keeping.  Calls to registerJob/unregisterJob aren't
> prevented
> > > from blocking and starving others, similar to obtainDelegationTokens.
> The
> > > contract can be made very clear in the javadoc.  Preventing races,
> > > starvation, or deadlock within the provider will therefore depend on
> proper
> > > implementation by the user.
> > >
> > > A larger reworking of DefaultDelegationTokenManager could try to do
> > > everything on a single thread
> > > (registerJob/unregisterJob/obtainDelegationTokens) to simplify this
> model,
> > > but would require using a special background thread rather than the
> > > ioExecutor.  I haven't considered this in detail, but would be open to
> it
> > > if it were strongly preferred.
> > >
> > > What I mean here specifically is that even if we schedule the renewal
> the
> > > > existing way
> > > > at least the providers list manipulation and the originally scheduled
> > > > renewal can race.
> > > > Maybe others since I can just imagine the change.
> > >
> > >
> > > I don't think we intend on changing the list of providers -- these are
> > > still immutable.   Whenever a new re-obtain is requested, it should
> cancel
> > > the originally scheduled renewal using the future as in
> > > DefaultDelegationTokenManager.stopTokensUpdate, ensuring just one
> update
> > > scheduled at a time.
> > >
> > > I hope I have answered a lot of your questions.  I'm happy to
> elaborate or
> > > even show a draft PR if that might be easier to trace.
> > >
> > > Thanks,
> > > Alan
> > >
> > > On Fri, Jun 19, 2026 at 7:50 AM Gabor Somogyi <
> [email protected]>
> > > wrote:
> > >
> > > > Hi Aleksandr,
> > > >
> > > > Thanks for efforts!
> > > >
> > > > I've missed this thread lately but have some thought/questions.
> > > >
> > > > Up until now one cluster per one set of user credentials was the
> model. I
> > > > think the multi-user
> > > > model better serves the needs so +1. We should mention this on the
> main
> > > > doc page later.
> > > >
> > > > Up until now DelegationTokenProvider instances were singletons and
> loaded
> > > > by the
> > > > service loader. Now we plan to add stop function, does that mean we
> plan
> > > > to change
> > > > the lifecycle?
> > > >
> > > > Having a generic way to ask the delegation token manager to
> re-obtain is
> > > a
> > > > long standing
> > > > needed feature but didn't have time. Having a dedicated API for this
> > > would
> > > > be maybe
> > > > better instead of relying on registerJob return value.
> > > >
> > > > Not sure sure how it's planned but new immediate re-obtain scheduling
> > > > would be good to be
> > > > upper bounded. Some retry logic can be aggressive about
> re-registration.
> > > > Or having a
> > > > cooldown is also fine.
> > > >
> > > > Last but not least up until now there was a single thread which
> played on
> > > > critical path on
> > > > immutable structures. Now we plan to change that which is fine but
> then I
> > > > would like to see an
> > > > exact plan what kind of threads are doing what and how do we protect
> > > > against
> > > > race/starvation/deadlock. Having an exact look is fine on the PR but
> this
> > > > is the gist of it
> > > > from my perspective.
> > > > What I mean here specifically is that even if we schedule the
> renewal the
> > > > existing way
> > > > at least the providers list manipulation and the originally scheduled
> > > > renewal can race.
> > > > Maybe others since I can just imagine the change.
> > > >
> > > > BR,
> > > > G
> > > >
> > > >
> > > > On 2026/06/05 16:35:15 Aleksandr Savonin wrote:
> > > > > Hi everyone,
> > > > >
> > > > > Alan Sheinberg and I would like to start a discussion on FLIP-588:
> > > > > Support per-job delegation tokens [1].
> > > > > Flink's delegation token framework is currently cluster-scoped,
> which
> > > > > means a DelegationTokenProvider has no notion of an individual job.
> > > > > This breaks when different jobs on the same cluster need to
> > > > > authenticate as different identities to the same external service.
> > > > > To resolve this, the FLIP adds per-job lifecycle hooks
> > > > > (registerJob/unregisterJob/stop) as default methods on the
> > > > > DelegationTokenProvider SPI, along with the runtime wiring to
> invoke
> > > > > them on job start and stop.
> > > > > This change is fully backward compatible (new methods are default
> > > > > no-ops). It is worth mentioning that it widens the internal
> > > > > registerJobMaster RPC to carry the job configuration.
> > > > >
> > > > > Looking forward to your feedback.
> > > > >
> > > > > [1]
> > > >
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-588*3A*Support*per-job*delegation*tokens__;JSsrKys!!Ayb5sqE7!pujTCGQDxHRMUp32hJP7kWS_heNDLb_73xOFQWmfwladcejJ1XJF028lAWmhEubAIfREamAXhXe0ImcLzn1TBQ9SvZl-ww$
> > > > >
> > > > > --
> > > > > Kind regards,
> > > > > Aleksandr
> > > > >
> > > >
> > >
>
>
>
> --
> Kind regards,
> Aleksandr
>

Reply via email to