Savonitar opened a new pull request, #28639:
URL: https://github.com/apache/flink/pull/28639
## What is the purpose of the change
Flink's delegation token framework is cluster-scoped: a
DelegationTokenProvider obtains one set of tokens for the whole cluster and has
no notion of an individual job. This breaks multi-tenant setups where jobs
running on a shared cluster (e.g. a session cluster) need to authenticate as
different identities against the same external service. This PR implements
FLIP-588: it adds per-job awareness to the provider SPI plus the runtime wiring
to invoke it. All new SPI methods are default, so existing providers keep
working unchanged.
## Brief change log
- DelegationTokenProvider gains default methods: an init(Configuration,
DelegationTokenManagerCallback) overload, registerJob(JobID, Configuration),
unregisterJob(JobID) and stop()
- New `@Experimental` interface DelegationTokenManagerCallback with a single
reobtainDelegationTokens(), handed to providers at init time so they can
request an on-demand obtain-and-broadcast cycle (e.g. right after a new job
registers)
- DelegationTokenManager (`@Internal`) gains the corresponding methods as
no-op defaults, so NoOpDelegationTokenManager and other implementations are
unaffected
- DefaultDelegationTokenManager fans registerJob/unregisterJob out to all
providers (rollback and rethrow on registration failure, per-provider catch
during unregistration)
- On-demand re-obtains reuse the existing renewal machinery: concurrent
requests are coalesced, throttled by the new
security.delegation.tokens.reobtain.cooldown option (default 30s), and only
ever bring the next cycle forward, never delaying a scheduled renewal
- ResourceManager calls registerJob when a JobMaster registers and rejects
the registration if it throws, so a job never starts without the tokens it
requires; unregisterJob is called when the job is removed
- ResourceManagerGateway#registerJobMaster is widened to carry the job
Configuration (the JobMaster now sends executionPlan.getJobConfiguration(); a
backward-compatible overload is kept)
- Generated configuration docs updated for the new option
## Verifying this change
This change added tests and can be verified as follows:
- Extended DefaultDelegationTokenManagerTest (manually-triggered executors)
covering: register/unregister fan-out with rollback on failure, coalescing of
concurrent re-obtains, cooldown behavior (first request immediate, deferral
within the window, reset on stop()); periodic-renewal vs. on-demand interplay,
idempotent registration, and serialized obtain cycles
- Added
ResourceManagerJobMasterTest#testRegisterJobMasterRejectedWhenDelegationTokenRegistrationFails,
which drives the widened RPC and asserts the JobMaster registration is
rejected when the delegation token manager's registerJob throws
- Existing delegation token provider implementations are untouched and their
tests still pass (the new SPI methods are default, so existing providers
compile and run unchanged)
- Additionally verified end-to-end with a Kafka connector prototype (per-job
OAuth tokens against a real SASL/OAUTHBEARER broker with per-principal ACLs);
that connector work will be proposed separately to flink-connector-kafka
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: SecurityOptions (@PublicEvolving) gains the new
`security.delegation.tokens.reobtain.cooldown` option, the extended/new SPI
types (DelegationTokenProvider, DelegationTokenManagerCallback) are
@Experimental
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: the JobMaster registration
path in the ResourceManager: registration is rejected if registerJob throws,
and registerJob is invoked again on JobMaster re-registration after failover
(the SPI contract requires it to be idempotent)
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs / JavaDocs: the new config
option is in the generated configuration reference, the SPI contracts
(threading, idempotency, ordering) are documented in the JavaDocs, and the
overall design is in FLIP-588
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change the checkbox below to `[X]` followed by the name of the tool, and
uncomment the
"Generated-by" line. See the ASF Generative Tooling Guidance for details:
https://www.apache.org/legal/generative-tooling.html
You are responsible for the quality and correctness of every change in this
PR
regardless of the tooling used. Low-effort AI-generated PRs will be closed.
See
AGENTS.md for the full guidance.
-->
- [ ] Yes (Claude Opus 4.8, via Claude Code)
<!--
Generated-by: Claude Opus 4.8 (1M context)
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]