Savonitar opened a new pull request, #28639:
URL: https://github.com/apache/flink/pull/28639

   ## What is the purpose of the change
   
   Flink's delegation token framework is cluster-scoped: a 
DelegationTokenProvider obtains one set of tokens for the whole cluster and has 
no notion of an individual job. This breaks multi-tenant setups where jobs 
running on a shared cluster (e.g. a session cluster) need to authenticate as 
different identities against the same external service. This PR implements 
FLIP-588: it adds per-job awareness to the provider SPI plus the runtime wiring 
to invoke it. All new SPI methods are default, so existing providers keep 
working unchanged.
   
   ## Brief change log
   
   - DelegationTokenProvider gains default methods: an init(Configuration, 
DelegationTokenManagerCallback) overload, registerJob(JobID, Configuration), 
unregisterJob(JobID) and stop()
   - New `@Experimental` interface DelegationTokenManagerCallback with a single 
reobtainDelegationTokens(), handed to providers at init time so they can 
request an on-demand obtain-and-broadcast cycle (e.g. right after a new job 
registers)
   - DelegationTokenManager (`@Internal`) gains the corresponding methods as 
no-op defaults, so NoOpDelegationTokenManager and other implementations are 
unaffected
   - DefaultDelegationTokenManager fans registerJob/unregisterJob out to all 
providers (rollback and rethrow on registration failure, per-provider catch 
during unregistration)
   - On-demand re-obtains reuse the existing renewal machinery: concurrent 
requests are coalesced, throttled by the new 
security.delegation.tokens.reobtain.cooldown option (default 30s), and only 
ever bring the next cycle forward, never delaying a scheduled renewal
   - ResourceManager calls registerJob when a JobMaster registers and rejects 
the registration if it throws, so a job never starts without the tokens it 
requires; unregisterJob is called when the job is removed
   - ResourceManagerGateway#registerJobMaster is widened to carry the job 
Configuration (the JobMaster now sends executionPlan.getJobConfiguration(); a 
backward-compatible overload is kept)
   - Generated configuration docs updated for the new option
   
   
   ## Verifying this change
   This change added tests and can be verified as follows: 
   - Extended DefaultDelegationTokenManagerTest (manually-triggered executors) 
covering: register/unregister fan-out with rollback on failure, coalescing of 
concurrent re-obtains, cooldown behavior (first request immediate, deferral 
within the window, reset on stop()); periodic-renewal vs. on-demand interplay, 
idempotent registration, and serialized obtain cycles 
   - Added 
ResourceManagerJobMasterTest#testRegisterJobMasterRejectedWhenDelegationTokenRegistrationFails,
 which drives the widened RPC and asserts the JobMaster registration is 
rejected when the delegation token manager's registerJob throws 
   - Existing delegation token provider implementations are untouched and their 
tests still pass (the new SPI methods are default, so existing providers 
compile and run unchanged) 
   - Additionally verified end-to-end with a Kafka connector prototype (per-job 
OAuth tokens against a real SASL/OAUTHBEARER broker with per-principal ACLs); 
that connector work will be proposed separately to flink-connector-kafka
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: SecurityOptions (@PublicEvolving) gains the new 
`security.delegation.tokens.reobtain.cooldown` option, the extended/new SPI 
types (DelegationTokenProvider, DelegationTokenManagerCallback) are 
@Experimental
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: the JobMaster registration 
path in the ResourceManager: registration is rejected if registerJob throws, 
and registerJob is invoked again on JobMaster re-registration after failover 
(the SPI contract requires it to be idempotent)
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs / JavaDocs: the new config 
option is in the generated configuration reference, the SPI contracts 
(threading, idempotency, ordering) are documented in the JavaDocs, and the 
overall design is in FLIP-588
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change the checkbox below to `[X]` followed by the name of the tool, and 
uncomment the
   "Generated-by" line. See the ASF Generative Tooling Guidance for details:
   https://www.apache.org/legal/generative-tooling.html
   
   You are responsible for the quality and correctness of every change in this 
PR
   regardless of the tooling used. Low-effort AI-generated PRs will be closed. 
See
   AGENTS.md for the full guidance.
   -->
   
   - [ ] Yes (Claude Opus 4.8, via Claude Code)
   
   <!--
   Generated-by: Claude Opus 4.8 (1M context)
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to