Hi Ashish,
In your stack trace I see it's invoking
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler,
so something in your configuration seems amiss.
If you can capture the AdminClientConfig output (with sensitive stuff redacted,
obvs), that would be helpful.
Thanks,
Kirk
On Thu, Mar 20, 2025, at 3:55 AM, ashish sood wrote:
> Hi Kirk,
>
> Thanks for checking.
>
> I am trying to setup a Kafka cluster with end-to-end oauth (i.e. Kafka -
> Kafka communication within a cluster & clients to Kafka broker). I was able
> to get my broker started without errors with below config however I am now
> unable to create topics with below error.
>
> *Current config*
> *jaas.config*
>
> KafkaServer {
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required
> clientId="<xxxxxxxx>"
> clientSecret="<xxxxxxxx>"
> audience="https://myprovider.com"
> token.endpoint.uri="https://xxxxxxxx/oauth/token"
> scope="kafka.read kafka.write";
> };
>
> *server.properties*
> listeners=SASL_PLAINTEXT://:9093
> advertised.listeners=SASL_PLAINTEXT://<>:9093
> sasl.enabled.mechanisms=OAUTHBEARER
> sasl.oauthbearer.expected.audience=https://myprovider.com
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> inter.broker.listener.name=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
> sasl.oauthbearer.token.endpoint.url=https://<xxxxxxxx>/oauth/token
> sasl.oauthbearer.jwks.endpoint.url=https://<xxxxxxxx>/.well-known/jwks.json
>
> *ERROR WHILE CREATING TOPIC*
>
> This is very strange because when I check the fetch the token manually via
> curl and check it , I clearly see the "sub" field populated with value
> <clientid@clients>
>
>
> * ERROR No principal name in JWT claim: sub
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)java.io.IOException:
> No principal name in JWT claim: sub*
> at
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> at
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
> at
> java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
> at
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
> at
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> at
> org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
> at
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
> at
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
> at
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
> at
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:119)
> at
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:223)
> at
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:189)
> at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:525)
> at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:492)
> at org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
> at
> org.apache.kafka.tools.TopicCommand$TopicService.createAdminClient(TopicCommand.java:437)
> at
> org.apache.kafka.tools.TopicCommand$TopicService.<init>(TopicCommand.java:426)
> at org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:98)
> at
> org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
> at org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)
>
>
>
>
>
> Regards
> Ashish Sood
>
> On Thu, Mar 20, 2025 at 12:15 AM Kirk True <[email protected]> wrote:
>
> > Hi Ashish,
> >
> > Are you using OAuth for client->broker communication, inter-broker
> > communication, or both?
> >
> > Based on the server.properties configuration that was shared, it looks
> > like the configuration is attempting to set up inter-broker communication
> > using OAuth.
> >
> > For a broker to *retrieve* tokens , it needs to have this configuration:
> >
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
> >
> > For a broker to *validate* tokens, it needs to have this configuration:
> >
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
> >
> > Then the SASL configs would need to be included too:
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required \
> > clientId="XXXXXXXXXXXXXXXXXX"
> > clientSecret="XXXXXXXXXXXXXXXXXX"
> > audience="https://myprovider.com"
> > serviceName="kafka"
> > scope="kafka.read kafka.write";
> >
> > If possible, please share any non-sensitive logs.
> >
> > Thanks,
> > Kirk
> >
> > On Wed, Mar 19, 2025, at 3:41 AM, ashish sood wrote:
> > > Hi All,
> > >
> > > I am setting up oauth for my Kafka broker. I have set up an account on
> > Auth0 for the same and set up an application and API.
> > >
> > > With the below config in the server.properties and Jaas.config file I
> > keep getting invalid token. Although if I generate a manual token via curl
> > it works fine. Also Auth0 logs show successful generation of the token,
> > still the Kafka shows error. Any suggestions to resolve this issue would be
> > appreciated.
> > >
> > > *Server.properties*
> > > listeners=SASL_PLAINTEXT://:9093
> > > advertised.listeners=SASL_PLAINTEXT://<XXXXXX>:9093
> > > sasl.enabled.mechanisms=OAUTHBEARER
> > > sasl.oauthbearer.jwks.endpoint.url=https://XXXXXXXXX/oauth/token <
> > https://xxxxxxxxx/oauth/token>
> > > sasl.oauthbearer.expected.audience=https://myprovider.com
> > >
> > oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required;
> > >
> > listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> > > confluent.oauth.groups.claim.name=groups
> > > inter.broker.listener.name=SASL_PLAINTEXT
> > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> > > super.users=User:<ClientID>
> > > sasl.oauthbearer.token.endpoint.url=<XXXXXXXXX>/oauth/token
> > > sasl.oauthbearer.audience=https://myprovider.com
> > > allow.everyone.if.no.acl.found=true
> > > **
> > > *Jaas Config*
> > > KafkaServer {
> > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required
> > > clientId="XXXXXXXXXXXXXXXXXX"
> > > clientSecret="XXXXXXXXXXXXXXXXXX"
> > > audience="https://myprovider.com"
> > > serviceName="kafka"
> > > scope="kafka.read kafka.write";
> > > };
> > >
> > > *Error*
> > > [2025-03-19 16:05:43,465] INFO [Controller id=0, targetBrokerId=0]
> > Failed authentication with localhost/127.0.0.1 (channelId=0)
> > ({"status":"invalid_token"}) (org.apache.kafka.common.network.Selector)
> > >
> > > image.png
> > >
> > > image.png
> > >
> > > Thanks & Regards
> > >
> > >
> > >
> > >
> > >
> > >
> > > ReplyForward
> > >
> > >
> > > Add reaction
> >
>