This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 203dcf1c feat(oauth2): add support for issuer URL override in client
credentials flow (#1463)
203dcf1c is described below
commit 203dcf1cd22dbcf853c2a06c52dee3c4e1eba92b
Author: Rui Fu <[email protected]>
AuthorDate: Thu Jan 29 22:00:28 2026 +0800
feat(oauth2): add support for issuer URL override in client credentials
flow (#1463)
* feat(oauth2): add support for issuer URL override in client credentials
flow
* fix(oauth2): handle nil options in client credentials flow
* test(oauth2): add tests for DefaultGrantProvider behavior
---
oauth2/client_credentials_flow.go | 14 ++++-
oauth2/client_credentials_flow_test.go | 91 +++++++++++++++++++++++++++++++
pulsar/auth/oauth2.go | 1 +
pulsar/auth/oauth2_test.go | 84 +++++++++++++++++++++++++++-
pulsaradmin/pkg/admin/auth/oauth2.go | 8 ++-
pulsaradmin/pkg/admin/auth/oauth2_test.go | 32 ++++++++++-
6 files changed, 225 insertions(+), 5 deletions(-)
diff --git a/oauth2/client_credentials_flow.go
b/oauth2/client_credentials_flow.go
index c45dc821..87cebc6b 100644
--- a/oauth2/client_credentials_flow.go
+++ b/oauth2/client_credentials_flow.go
@@ -53,6 +53,7 @@ type GrantProvider interface {
type ClientCredentialsFlowOptions struct {
KeyFile string
+ IssuerURL string
AdditionalScopes []string
}
@@ -64,13 +65,24 @@ type DefaultGrantProvider struct {
// merging the scopes from both the options and the key file configuration
func (p *DefaultGrantProvider) GetGrant(audience string, options
*ClientCredentialsFlowOptions) (
*AuthorizationGrant, error) {
+ if options == nil {
+ return nil, errors.New("client credentials flow options cannot
be nil")
+ }
credsProvider :=
NewClientCredentialsProviderFromKeyFile(options.KeyFile)
keyFile, err := credsProvider.GetClientCredentials()
if err != nil {
return nil, errors.Wrap(err, "could not get client credentials")
}
- wellKnownEndpoints, err :=
GetOIDCWellKnownEndpointsFromIssuerURL(keyFile.IssuerURL)
+ issuerURL := options.IssuerURL
+ if issuerURL == "" {
+ issuerURL = keyFile.IssuerURL
+ }
+ if issuerURL == "" {
+ return nil, errors.New("issuer url is required for client
credentials flow")
+ }
+
+ wellKnownEndpoints, err :=
GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL)
if err != nil {
return nil, err
}
diff --git a/oauth2/client_credentials_flow_test.go
b/oauth2/client_credentials_flow_test.go
index 97ad8710..d1a1c592 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -19,6 +19,10 @@ package oauth2
import (
"errors"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
"time"
"github.com/apache/pulsar-client-go/oauth2/clock"
@@ -49,6 +53,57 @@ var clientCredentials = KeyFile{
Scope: "test_scope",
}
+func mockWellKnownServer(tokenEndpoint string) *httptest.Server {
+ handler := http.NewServeMux()
+ handler.HandleFunc("/.well-known/openid-configuration", func(writer
http.ResponseWriter, _ *http.Request) {
+ fmt.Fprintf(writer, "{\n \"token_endpoint\": \"%s\"\n}\n",
tokenEndpoint)
+ })
+ return httptest.NewServer(handler)
+}
+
+func mockKeyFileWithIssuer(issuerURL string) (string, error) {
+ kf, err := os.CreateTemp("", "test_oauth2")
+ if err != nil {
+ return "", err
+ }
+ _, err = kf.WriteString(fmt.Sprintf(`{
+ "type":"resource",
+ "client_id":"client-id",
+ "client_secret":"client-secret",
+ "client_email":"[email protected]",
+ "issuer_url":"%s"
+}`, issuerURL))
+ if err != nil {
+ _ = kf.Close()
+ return "", err
+ }
+ if err := kf.Close(); err != nil {
+ return "", err
+ }
+ return kf.Name(), nil
+}
+
+func mockKeyFileWithoutIssuer() (string, error) {
+ kf, err := os.CreateTemp("", "test_oauth2")
+ if err != nil {
+ return "", err
+ }
+ _, err = kf.WriteString(`{
+ "type":"resource",
+ "client_id":"client-id",
+ "client_secret":"client-secret",
+ "client_email":"[email protected]"
+}`)
+ if err != nil {
+ _ = kf.Close()
+ return "", err
+ }
+ if err := kf.Close(); err != nil {
+ return "", err
+ }
+ return kf.Name(), nil
+}
+
var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
ginkgo.Describe("Authorize", func() {
@@ -124,6 +179,42 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
})
})
+var _ = ginkgo.Describe("DefaultGrantProvider", func() {
+ ginkgo.It("prefers issuer url from options over key file", func() {
+ keyFileTokenEndpoint := "http://keyfile.example/token"
+ optionsTokenEndpoint := "http://options.example/token"
+ serverFromKeyFile := mockWellKnownServer(keyFileTokenEndpoint)
+ defer serverFromKeyFile.Close()
+ serverFromOptions := mockWellKnownServer(optionsTokenEndpoint)
+ defer serverFromOptions.Close()
+
+ keyFile, err := mockKeyFileWithIssuer(serverFromKeyFile.URL)
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+ defer os.Remove(keyFile)
+
+ provider := DefaultGrantProvider{}
+ grant, err := provider.GetGrant("test-audience",
&ClientCredentialsFlowOptions{
+ KeyFile: keyFile,
+ IssuerURL: serverFromOptions.URL,
+ })
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+
gomega.Expect(grant.TokenEndpoint).To(gomega.Equal(optionsTokenEndpoint))
+ })
+
+ ginkgo.It("returns an error when issuer url is missing", func() {
+ keyFile, err := mockKeyFileWithoutIssuer()
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+ defer os.Remove(keyFile)
+
+ provider := DefaultGrantProvider{}
+ _, err = provider.GetGrant("test-audience",
&ClientCredentialsFlowOptions{
+ KeyFile: keyFile,
+ })
+ gomega.Expect(err).To(gomega.HaveOccurred())
+ gomega.Expect(err.Error()).To(gomega.Equal("issuer url is
required for client credentials flow"))
+ })
+})
+
var _ = ginkgo.Describe("ClientCredentialsGrantRefresher", func() {
ginkgo.Describe("Refresh", func() {
diff --git a/pulsar/auth/oauth2.go b/pulsar/auth/oauth2.go
index e35a15b2..42edd178 100644
--- a/pulsar/auth/oauth2.go
+++ b/pulsar/auth/oauth2.go
@@ -61,6 +61,7 @@ func NewAuthenticationOAuth2WithParams(params
map[string]string) (Provider, erro
case ConfigParamTypeClientCredentials:
flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
KeyFile: params[ConfigParamKeyFile],
+ IssuerURL: params[ConfigParamIssuerURL],
AdditionalScopes:
strings.Split(params[ConfigParamScope], " "),
})
if err != nil {
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index ecaad51f..7dbb479d 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -36,6 +36,11 @@ var expectedClientSecret atomic.Value
// mockOAuthServer will mock a oauth service for the tests
func mockOAuthServer() *httptest.Server {
+ return mockOAuthServerWithToken("token-content")
+}
+
+// mockOAuthServerWithToken will mock a oauth service for the tests with a
custom token.
+func mockOAuthServerWithToken(token string) *httptest.Server {
// prepare a port for the mocked server
server := httptest.NewUnstartedServer(http.DefaultServeMux)
@@ -61,7 +66,7 @@ func mockOAuthServer() *httptest.Server {
http.Error(writer, "invalid client credentials",
http.StatusUnauthorized)
return
}
- fmt.Fprintln(writer, "{\n \"access_token\":
\"token-content\",\n \"token_type\": \"Bearer\"\n}")
+ fmt.Fprintf(writer, "{\n \"access_token\": \"%s\",\n
\"token_type\": \"Bearer\"\n}\n", token)
})
mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
fmt.Fprintln(writer, "true")
@@ -98,6 +103,29 @@ func mockKeyFile(server string) (string, error) {
return kf.Name(), nil
}
+func mockKeyFileWithoutIssuer() (string, error) {
+ pwd, err := os.Getwd()
+ if err != nil {
+ return "", err
+ }
+ kf, err := os.CreateTemp(pwd, "test_oauth2")
+ if err != nil {
+ return "", err
+ }
+ _, err = kf.WriteString(`{
+ "type":"resource",
+ "client_id":"client-id",
+ "client_secret":"client-secret",
+ "client_email":"[email protected]",
+ "scope": "test-scope"
+}`)
+ if err != nil {
+ return "", err
+ }
+
+ return kf.Name(), nil
+}
+
func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
server := mockOAuthServer()
defer server.Close()
@@ -162,6 +190,60 @@ func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
}
}
+func TestOAuth2IssuerOverrideUsesAuthParams(t *testing.T) {
+ expectedClientID.Store("client-id")
+ expectedClientSecret.Store("client-secret")
+ serverFromKeyFile := mockOAuthServerWithToken("token-from-keyfile")
+ defer serverFromKeyFile.Close()
+ serverFromParams := mockOAuthServerWithToken("token-from-params")
+ defer serverFromParams.Close()
+
+ kf, err := mockKeyFile(serverFromKeyFile.URL)
+ defer os.Remove(kf)
+ require.NoError(t, err)
+
+ params := map[string]string{
+ ConfigParamType: ConfigParamTypeClientCredentials,
+ ConfigParamIssuerURL: serverFromParams.URL,
+ ConfigParamClientID: "client-id",
+ ConfigParamAudience: "audience",
+ ConfigParamKeyFile: kf,
+ ConfigParamScope: "profile",
+ }
+
+ auth, err := NewAuthenticationOAuth2WithParams(params)
+ require.NoError(t, err)
+ require.NoError(t, auth.Init())
+
+ token, err := auth.GetData()
+ require.NoError(t, err)
+ assert.Equal(t, "token-from-params", string(token))
+}
+
+func TestOAuth2MissingIssuerReturnsError(t *testing.T) {
+ expectedClientID.Store("client-id")
+ expectedClientSecret.Store("client-secret")
+ kf, err := mockKeyFileWithoutIssuer()
+ defer os.Remove(kf)
+ require.NoError(t, err)
+
+ params := map[string]string{
+ ConfigParamType: ConfigParamTypeClientCredentials,
+ ConfigParamClientID: "client-id",
+ ConfigParamAudience: "audience",
+ ConfigParamKeyFile: kf,
+ ConfigParamScope: "profile",
+ }
+
+ auth, err := NewAuthenticationOAuth2WithParams(params)
+ require.NoError(t, err)
+ require.NoError(t, auth.Init())
+
+ _, err = auth.GetData()
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "issuer url is required for client
credentials flow")
+}
+
func TestOAuth2KeyFileReloading(t *testing.T) {
server := mockOAuthServer()
defer server.Close()
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 9536f09a..f310f135 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -52,7 +52,8 @@ type OAuth2Provider struct {
// NewAuthenticationOAuth2WithDefaultFlow uses memory to save the grant
func NewAuthenticationOAuth2WithDefaultFlow(issuer oauth2.Issuer, keyFile
string) (Provider, error) {
return NewAuthenticationOAuth2WithFlow(issuer,
oauth2.ClientCredentialsFlowOptions{
- KeyFile: keyFile,
+ KeyFile: keyFile,
+ IssuerURL: issuer.IssuerEndpoint,
})
}
@@ -97,7 +98,10 @@ func NewAuthenticationOAuth2WithParams(
Audience: audience,
}
- flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{KeyFile:
privateKey})
+ flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
+ KeyFile: privateKey,
+ IssuerURL: issuerEndpoint,
+ })
if err != nil {
return nil, err
}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index 65704ab8..3f3ea32f 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -27,10 +27,16 @@ import (
"github.com/apache/pulsar-client-go/oauth2"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
// mockOAuthServer will mock a oauth service for the tests
func mockOAuthServer() *httptest.Server {
+ return mockOAuthServerWithToken("token-content")
+}
+
+// mockOAuthServerWithToken will mock a oauth service for the tests with a
custom token.
+func mockOAuthServerWithToken(token string) *httptest.Server {
// prepare a port for the mocked server
server := httptest.NewUnstartedServer(http.DefaultServeMux)
@@ -46,7 +52,7 @@ func mockOAuthServer() *httptest.Server {
fmt.Fprintln(writer, s)
})
mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, _ *http.Request) {
- fmt.Fprintln(writer, "{\n \"access_token\":
\"token-content\",\n \"token_type\": \"Bearer\"\n}")
+ fmt.Fprintf(writer, "{\n \"access_token\": \"%s\",\n
\"token_type\": \"Bearer\"\n}\n", token)
})
mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
fmt.Fprintln(writer, "true")
@@ -115,3 +121,27 @@ func TestOauth2(t *testing.T) {
}
assert.Equal(t, "token-content", token.AccessToken)
}
+
+func TestOAuth2IssuerOverrideUsesAuthParams(t *testing.T) {
+ serverFromKeyFile := mockOAuthServerWithToken("token-from-keyfile")
+ defer serverFromKeyFile.Close()
+ serverFromParams := mockOAuthServerWithToken("token-from-params")
+ defer serverFromParams.Close()
+
+ kf, err := mockKeyFile(serverFromKeyFile.URL)
+ defer os.Remove(kf)
+ require.NoError(t, err)
+
+ provider, err := NewAuthenticationOAuth2WithParams(
+ serverFromParams.URL,
+ "client-id",
+ serverFromParams.URL,
+ kf,
+ http.DefaultTransport,
+ )
+ require.NoError(t, err)
+
+ token, err := provider.source.Token()
+ require.NoError(t, err)
+ assert.Equal(t, "token-from-params", token.AccessToken)
+}