This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5b0e0545 Support reloading OAuth2 key file (#1441)
5b0e0545 is described below
commit 5b0e0545fc9ba4ad3b86c2e1d3615c007b59558f
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 27 13:07:56 2025 +0800
Support reloading OAuth2 key file (#1441)
---
oauth2/auth_suite_test.go | 20 +++
oauth2/cache/cache.go | 70 +++--------
oauth2/client_credentials_flow.go | 99 ++++++++-------
oauth2/client_credentials_flow_test.go | 19 ++-
oauth2/store/keyring.go | 195 ------------------------------
oauth2/store/memory.go | 87 -------------
oauth2/store/store.go | 45 -------
pulsar/auth/oauth2.go | 45 +------
pulsar/auth/oauth2_test.go | 93 +++++++++++++-
pulsaradmin/pkg/admin/auth/oauth2.go | 109 ++---------------
pulsaradmin/pkg/admin/auth/oauth2_test.go | 32 ++---
11 files changed, 216 insertions(+), 598 deletions(-)
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
index 54b24299..0f81487f 100644
--- a/oauth2/auth_suite_test.go
+++ b/oauth2/auth_suite_test.go
@@ -58,6 +58,26 @@ func (te *MockTokenExchanger) ExchangeDeviceCode(_
context.Context,
return te.ReturnsTokens, te.ReturnsError
}
+type MockGrantProvider struct {
+ keyFile *KeyFile
+}
+
+func (mgp *MockGrantProvider) GetGrant(audience string, opts
*ClientCredentialsFlowOptions) (
+ *AuthorizationGrant, error) {
+ scopes := []string{mgp.keyFile.Scope}
+ if opts != nil && len(opts.AdditionalScopes) > 0 {
+ scopes = append(scopes, opts.AdditionalScopes...)
+ }
+ return &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: audience,
+ ClientID: mgp.keyFile.ClientID,
+ ClientCredentials: mgp.keyFile,
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ Scopes: scopes,
+ }, nil
+}
+
var oidcEndpoints = OIDCWellKnownEndpoints{
AuthorizationEndpoint: "http://issuer/auth/authorize",
TokenEndpoint: "http://issuer/auth/token",
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
index e2279843..df41efed 100644
--- a/oauth2/cache/cache.go
+++ b/oauth2/cache/cache.go
@@ -23,8 +23,6 @@ import (
"time"
"github.com/apache/pulsar-client-go/oauth2"
- "github.com/apache/pulsar-client-go/oauth2/store"
-
"github.com/apache/pulsar-client-go/oauth2/clock"
xoauth2 "golang.org/x/oauth2"
)
@@ -43,24 +41,24 @@ const (
)
// tokenCache implements a cache for the token associated with a specific
audience.
-// it interacts with the store when the access token is near expiration or
invalidated.
// it is advisable to use a token cache instance per audience.
type tokenCache struct {
- clock clock.Clock
- lock sync.Mutex
- store store.Store
- audience string
- refresher oauth2.AuthorizationGrantRefresher
- token *xoauth2.Token
+ clock clock.Clock
+ lock sync.Mutex
+ audience string
+ token *xoauth2.Token
+ flow *oauth2.ClientCredentialsFlow
}
-func NewDefaultTokenCache(store store.Store, audience string,
- refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource,
error) {
+func NewDefaultTokenCache(audience string,
+ flow *oauth2.ClientCredentialsFlow) (CachingTokenSource, error) {
+ if flow == nil {
+ return nil, fmt.Errorf("flow cannot be nil")
+ }
cache := &tokenCache{
- clock: clock.RealClock{},
- store: store,
- audience: audience,
- refresher: refresher,
+ clock: clock.RealClock{},
+ audience: audience,
+ flow: flow,
}
return cache, nil
}
@@ -77,56 +75,24 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
return t.token, nil
}
- // load from the store and use the access token if it isn't expired
- grant, err := t.store.LoadGrant(t.audience)
+ grant, err := t.flow.Authorize(t.audience)
if err != nil {
- return nil, fmt.Errorf("LoadGrant: %w", err)
- }
- t.token = grant.Token
- if t.token != nil && t.validateAccessToken(*t.token) {
- return t.token, nil
+ return nil, err
}
-
- // obtain and cache a fresh access token
- grant, err = t.refresher.Refresh(grant)
- if err != nil {
- return nil, fmt.Errorf("RefreshGrant: %w", err)
+ if grant.Token == nil {
+ return nil, fmt.Errorf("authorization succeeded but no token
was returned")
}
t.token = grant.Token
- err = t.store.SaveGrant(t.audience, *grant)
- if err != nil {
- // TODO log rather than throw
- return nil, fmt.Errorf("SaveGrant: %w", err)
- }
return t.token, nil
}
-// InvalidateToken clears the access token (likely due to a response from the
resource server).
-// Note that the token within the grant may contain a refresh token which
should survive.
+// InvalidateToken clears the cached access token (likely due to a response
from the resource server).
func (t *tokenCache) InvalidateToken() error {
t.lock.Lock()
defer t.lock.Unlock()
- previous := t.token
t.token = nil
-
- // clear from the store the access token that was returned earlier
(unless the store has since been updated)
- if previous == nil || previous.AccessToken == "" {
- return nil
- }
- grant, err := t.store.LoadGrant(t.audience)
- if err != nil {
- return fmt.Errorf("LoadGrant: %w", err)
- }
- if grant.Token != nil && grant.Token.AccessToken ==
previous.AccessToken {
- grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
- err = t.store.SaveGrant(t.audience, *grant)
- if err != nil {
- // TODO log rather than throw
- return fmt.Errorf("SaveGrant: %w", err)
- }
- }
return nil
}
diff --git a/oauth2/client_credentials_flow.go
b/oauth2/client_credentials_flow.go
index 9a2643a9..c45dc821 100644
--- a/oauth2/client_credentials_flow.go
+++ b/oauth2/client_credentials_flow.go
@@ -30,11 +30,10 @@ import (
// ClientCredentialsFlow takes care of the mechanics needed for getting an
access
// token using the OAuth 2.0 "Client Credentials Flow"
type ClientCredentialsFlow struct {
- options ClientCredentialsFlowOptions
- oidcWellKnownEndpoints OIDCWellKnownEndpoints
- keyfile *KeyFile
- exchanger ClientCredentialsExchanger
- clock clock.Clock
+ options ClientCredentialsFlowOptions
+ exchanger ClientCredentialsExchanger
+ grantProvider GrantProvider
+ clock clock.Clock
}
// ClientCredentialsProvider abstracts getting client credentials
@@ -47,29 +46,24 @@ type ClientCredentialsExchanger interface {
ExchangeClientCredentials(req ClientCredentialsExchangeRequest)
(*TokenResult, error)
}
+// GrantProvider abstracts the creation of authorization grants from
credentials
+type GrantProvider interface {
+ GetGrant(audience string, options *ClientCredentialsFlowOptions)
(*AuthorizationGrant, error)
+}
+
type ClientCredentialsFlowOptions struct {
KeyFile string
AdditionalScopes []string
}
-func newClientCredentialsFlow(
- options ClientCredentialsFlowOptions,
- keyfile *KeyFile,
- oidcWellKnownEndpoints OIDCWellKnownEndpoints,
- exchanger ClientCredentialsExchanger,
- clock clock.Clock) *ClientCredentialsFlow {
- return &ClientCredentialsFlow{
- options: options,
- oidcWellKnownEndpoints: oidcWellKnownEndpoints,
- keyfile: keyfile,
- exchanger: exchanger,
- clock: clock,
- }
+// DefaultGrantProvider provides authorization grants by loading credentials
from a key file
+type DefaultGrantProvider struct {
}
-// NewDefaultClientCredentialsFlow provides an easy way to build up a default
-// client credentials flow with all the correct configuration.
-func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions)
(*ClientCredentialsFlow, error) {
+// GetGrant creates an authorization grant by loading credentials from the key
file and
+// merging the scopes from both the options and the key file configuration
+func (p *DefaultGrantProvider) GetGrant(audience string, options
*ClientCredentialsFlowOptions) (
+ *AuthorizationGrant, error) {
credsProvider :=
NewClientCredentialsProviderFromKeyFile(options.KeyFile)
keyFile, err := credsProvider.GetClientCredentials()
if err != nil {
@@ -80,39 +74,58 @@ func NewDefaultClientCredentialsFlow(options
ClientCredentialsFlowOptions) (*Cli
if err != nil {
return nil, err
}
+ // Merge the scopes of the options AdditionalScopes with the scopes
read from the keyFile config
+ var scopesToAdd []string
+ if len(options.AdditionalScopes) > 0 {
+ scopesToAdd = append(scopesToAdd, options.AdditionalScopes...)
+ }
+
+ if keyFile.Scope != "" {
+ scopesSplit := strings.Fields(keyFile.Scope)
+ scopesToAdd = append(scopesToAdd, scopesSplit...)
+ }
+
+ return &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: audience,
+ ClientID: keyFile.ClientID,
+ ClientCredentials: keyFile,
+ TokenEndpoint: wellKnownEndpoints.TokenEndpoint,
+ Scopes: scopesToAdd,
+ }, nil
+}
+
+func newClientCredentialsFlow(
+ options ClientCredentialsFlowOptions,
+ exchanger ClientCredentialsExchanger,
+ grantProvider GrantProvider,
+ clock clock.Clock) *ClientCredentialsFlow {
+ return &ClientCredentialsFlow{
+ options: options,
+ exchanger: exchanger,
+ grantProvider: grantProvider,
+ clock: clock,
+ }
+}
+
+// NewDefaultClientCredentialsFlow provides an easy way to build up a default
+// client credentials flow with all the correct configuration.
+func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions)
(*ClientCredentialsFlow, error) {
tokenRetriever := NewTokenRetriever(&http.Client{})
return newClientCredentialsFlow(
options,
- keyFile,
- *wellKnownEndpoints,
tokenRetriever,
+ &DefaultGrantProvider{},
clock.RealClock{}), nil
}
var _ Flow = &ClientCredentialsFlow{}
func (c *ClientCredentialsFlow) Authorize(audience string)
(*AuthorizationGrant, error) {
- var err error
-
- // Merge the scopes of the options AdditionalScopes with the scopes
read from the keyFile config
- var scopesToAdd []string
- if len(c.options.AdditionalScopes) > 0 {
- scopesToAdd = append(scopesToAdd, c.options.AdditionalScopes...)
- }
-
- if c.keyfile.Scope != "" {
- scopesSplit := strings.Split(c.keyfile.Scope, " ")
- scopesToAdd = append(scopesToAdd, scopesSplit...)
- }
-
- grant := &AuthorizationGrant{
- Type: GrantTypeClientCredentials,
- Audience: audience,
- ClientID: c.keyfile.ClientID,
- ClientCredentials: c.keyfile,
- TokenEndpoint: c.oidcWellKnownEndpoints.TokenEndpoint,
- Scopes: scopesToAdd,
+ grant, err := c.grantProvider.GetGrant(audience, &c.options)
+ if err != nil {
+ return nil, err
}
// test the credentials and obtain an initial access token
diff --git a/oauth2/client_credentials_flow_test.go
b/oauth2/client_credentials_flow_test.go
index 8fd0a110..97ad8710 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -54,6 +54,7 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
var mockClock clock.Clock
var mockTokenExchanger *MockTokenExchanger
+ var mockGrantProvider *MockGrantProvider
ginkgo.BeforeEach(func() {
mockClock = testing.NewFakeClock(time.Unix(0, 0))
@@ -61,18 +62,18 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
mockTokenExchanger = &MockTokenExchanger{
ReturnsTokens: &expectedTokens,
}
+ mockGrantProvider = &MockGrantProvider{
+ keyFile: &clientCredentials,
+ }
})
ginkgo.It("invokes TokenExchanger with credentials", func() {
- additionalScope := "additional_scope"
provider := newClientCredentialsFlow(
ClientCredentialsFlowOptions{
- KeyFile: "test_keyfile",
- AdditionalScopes:
[]string{additionalScope},
+ KeyFile: "test_keyfile",
},
- &clientCredentials,
- oidcEndpoints,
mockTokenExchanger,
+ mockGrantProvider,
mockClock,
)
@@ -83,7 +84,7 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
ClientID: clientCredentials.ClientID,
ClientSecret: clientCredentials.ClientSecret,
Audience: "test_audience",
- Scopes: []string{additionalScope,
clientCredentials.Scope},
+ Scopes:
[]string{clientCredentials.Scope},
}))
})
@@ -92,9 +93,8 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
ClientCredentialsFlowOptions{
KeyFile: "test_keyfile",
},
- &clientCredentials,
- oidcEndpoints,
mockTokenExchanger,
+ mockGrantProvider,
mockClock,
)
@@ -112,9 +112,8 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
ClientCredentialsFlowOptions{
KeyFile: "test_keyfile",
},
- &clientCredentials,
- oidcEndpoints,
mockTokenExchanger,
+ mockGrantProvider,
mockClock,
)
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
deleted file mode 100644
index 70fba5b0..00000000
--- a/oauth2/store/keyring.go
+++ /dev/null
@@ -1,195 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
- "crypto/sha1"
- "encoding/json"
- "errors"
- "fmt"
- "sync"
-
- "github.com/99designs/keyring"
- "github.com/apache/pulsar-client-go/oauth2"
- "github.com/apache/pulsar-client-go/oauth2/clock"
-)
-
-type KeyringStore struct {
- kr keyring.Keyring
- clock clock.Clock
- lock sync.Mutex
-}
-
-// storedItem represents an item stored in the keyring
-type storedItem struct {
- Audience string
- UserName string
- Grant oauth2.AuthorizationGrant
-}
-
-// NewKeyringStore creates a store based on a keyring.
-func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) {
- return &KeyringStore{
- kr: kr,
- clock: clock.RealClock{},
- }, nil
-}
-
-var _ Store = &KeyringStore{}
-
-func (f *KeyringStore) SaveGrant(audience string, grant
oauth2.AuthorizationGrant) error {
- f.lock.Lock()
- defer f.lock.Unlock()
-
- var err error
- var userName string
- switch grant.Type {
- case oauth2.GrantTypeClientCredentials:
- if grant.ClientCredentials == nil {
- return ErrUnsupportedAuthData
- }
- userName = grant.ClientCredentials.ClientEmail
- case oauth2.GrantTypeDeviceCode:
- if grant.Token == nil {
- return ErrUnsupportedAuthData
- }
- userName, err = oauth2.ExtractUserName(*grant.Token)
- if err != nil {
- return err
- }
- default:
- return ErrUnsupportedAuthData
- }
- item := storedItem{
- Audience: audience,
- UserName: userName,
- Grant: grant,
- }
- err = f.setItem(item)
- if err != nil {
- return err
- }
- return nil
-}
-
-func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant,
error) {
- f.lock.Lock()
- defer f.lock.Unlock()
-
- item, err := f.getItem(audience)
- if err != nil {
- if errors.Is(err, keyring.ErrKeyNotFound) {
- return nil, ErrNoAuthenticationData
- }
- return nil, err
- }
- switch item.Grant.Type {
- case oauth2.GrantTypeClientCredentials:
- if item.Grant.ClientCredentials == nil {
- return nil, ErrUnsupportedAuthData
- }
- case oauth2.GrantTypeDeviceCode:
- if item.Grant.Token == nil {
- return nil, ErrUnsupportedAuthData
- }
- default:
- return nil, ErrUnsupportedAuthData
- }
- return &item.Grant, nil
-}
-
-func (f *KeyringStore) WhoAmI(audience string) (string, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
-
- key := hashKeyringKey(audience)
- authItem, err := f.kr.Get(key)
- if err != nil {
- if errors.Is(err, keyring.ErrKeyNotFound) {
- return "", ErrNoAuthenticationData
- }
- return "", fmt.Errorf("unable to get information from the
keyring: %w", err)
- }
- return authItem.Label, nil
-}
-
-func (f *KeyringStore) Logout() error {
- f.lock.Lock()
- defer f.lock.Unlock()
-
- var err error
- keys, err := f.kr.Keys()
- if err != nil {
- return fmt.Errorf("unable to get information from the keyring:
%w", err)
- }
- for _, key := range keys {
- err = f.kr.Remove(key)
- }
- if err != nil {
- return fmt.Errorf("unable to update the keyring: %w", err)
- }
- return nil
-}
-
-func (f *KeyringStore) getItem(audience string) (storedItem, error) {
- key := hashKeyringKey(audience)
- i, err := f.kr.Get(key)
- if err != nil {
- return storedItem{}, err
- }
- var grant oauth2.AuthorizationGrant
- err = json.Unmarshal(i.Data, &grant)
- if err != nil {
- // the grant appears to be invalid
- return storedItem{}, ErrUnsupportedAuthData
- }
- return storedItem{
- Audience: audience,
- UserName: i.Label,
- Grant: grant,
- }, nil
-}
-
-func (f *KeyringStore) setItem(item storedItem) error {
- key := hashKeyringKey(item.Audience)
- data, err := json.Marshal(item.Grant)
- if err != nil {
- return err
- }
- i := keyring.Item{
- Key: key,
- Data: data,
- Label: item.UserName,
- Description: "authorization grant",
- KeychainNotTrustApplication: false,
- KeychainNotSynchronizable: false,
- }
- err = f.kr.Set(i)
- if err != nil {
- return fmt.Errorf("unable to update the keyring: %w", err)
- }
- return nil
-}
-
-// hashKeyringKey creates a safe key based on the given string
-func hashKeyringKey(s string) string {
- h := sha1.New()
- h.Write([]byte(s))
- bs := h.Sum(nil)
- return fmt.Sprintf("%x", bs)
-}
diff --git a/oauth2/store/memory.go b/oauth2/store/memory.go
deleted file mode 100644
index 07c75947..00000000
--- a/oauth2/store/memory.go
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
- "sync"
-
- "github.com/apache/pulsar-client-go/oauth2"
- "github.com/apache/pulsar-client-go/oauth2/clock"
-)
-
-type MemoryStore struct {
- clock clock.Clock
- lock sync.Mutex
- grants map[string]*oauth2.AuthorizationGrant
-}
-
-func NewMemoryStore() Store {
- return &MemoryStore{
- clock: clock.RealClock{},
- grants: make(map[string]*oauth2.AuthorizationGrant),
- }
-}
-
-var _ Store = &MemoryStore{}
-
-func (f *MemoryStore) SaveGrant(audience string, grant
oauth2.AuthorizationGrant) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.grants[audience] = &grant
- return nil
-}
-
-func (f *MemoryStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant,
error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- grant, ok := f.grants[audience]
- if !ok {
- return nil, ErrNoAuthenticationData
- }
- return grant, nil
-}
-
-func (f *MemoryStore) WhoAmI(audience string) (string, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- grant, ok := f.grants[audience]
- if !ok {
- return "", ErrNoAuthenticationData
- }
- switch grant.Type {
- case oauth2.GrantTypeClientCredentials:
- if grant.ClientCredentials == nil {
- return "", ErrUnsupportedAuthData
- }
- return grant.ClientCredentials.ClientEmail, nil
- case oauth2.GrantTypeDeviceCode:
- if grant.Token == nil {
- return "", ErrUnsupportedAuthData
- }
- return oauth2.ExtractUserName(*grant.Token)
- default:
- return "", ErrUnsupportedAuthData
- }
-}
-
-func (f *MemoryStore) Logout() error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.grants = map[string]*oauth2.AuthorizationGrant{}
- return nil
-}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
deleted file mode 100644
index 5e916920..00000000
--- a/oauth2/store/store.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
- "errors"
-
- "github.com/apache/pulsar-client-go/oauth2"
-)
-
-// ErrNoAuthenticationData indicates that stored authentication data is not
available
-var ErrNoAuthenticationData = errors.New("authentication data is not
available")
-
-// ErrUnsupportedAuthData indicates that stored authentication data is unusable
-var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
-
-// Store is responsible for persisting authorization grants
-type Store interface {
- // SaveGrant stores an authorization grant for a given audience
- SaveGrant(audience string, grant oauth2.AuthorizationGrant) error
-
- // LoadGrant loads an authorization grant for a given audience
- LoadGrant(audience string) (*oauth2.AuthorizationGrant, error)
-
- // WhoAmI returns the current user name (or an error if nobody is
logged in)
- WhoAmI(audience string) (string, error)
-
- // Logout deletes all stored credentials
- Logout() error
-}
diff --git a/pulsar/auth/oauth2.go b/pulsar/auth/oauth2.go
index e162e4ed..e35a15b2 100644
--- a/pulsar/auth/oauth2.go
+++ b/pulsar/auth/oauth2.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/pulsar-client-go/oauth2"
"github.com/apache/pulsar-client-go/oauth2/cache"
"github.com/apache/pulsar-client-go/oauth2/clock"
- "github.com/apache/pulsar-client-go/oauth2/store"
)
const (
@@ -44,10 +43,10 @@ const (
type oauth2AuthProvider struct {
clock clock.Clock
issuer oauth2.Issuer
- store store.Store
source cache.CachingTokenSource
defaultTransport http.RoundTripper
tokenTransport *transport
+ flow *oauth2.ClientCredentialsFlow
}
// NewAuthenticationOAuth2WithParams return a interface of Provider with
string map.
@@ -58,8 +57,6 @@ func NewAuthenticationOAuth2WithParams(params
map[string]string) (Provider, erro
Audience: params[ConfigParamAudience],
}
- // initialize a store of authorization grants
- st := store.NewMemoryStore()
switch params[ConfigParamType] {
case ConfigParamTypeClientCredentials:
flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
@@ -69,46 +66,25 @@ func NewAuthenticationOAuth2WithParams(params
map[string]string) (Provider, erro
if err != nil {
return nil, err
}
- grant, err := flow.Authorize(issuer.Audience)
- if err != nil {
- return nil, err
- }
- err = st.SaveGrant(issuer.Audience, *grant)
- if err != nil {
- return nil, err
- }
+ return NewAuthenticationOAuth2(issuer, flow), nil
default:
return nil, fmt.Errorf("unsupported authentication type: %s",
params[ConfigParamType])
}
-
- return NewAuthenticationOAuth2(issuer, st), nil
}
func NewAuthenticationOAuth2(
issuer oauth2.Issuer,
- store store.Store) Provider {
+ flow *oauth2.ClientCredentialsFlow) Provider {
return &oauth2AuthProvider{
clock: clock.RealClock{},
issuer: issuer,
- store: store,
+ flow: flow,
}
}
func (p *oauth2AuthProvider) Init() error {
- grant, err := p.store.LoadGrant(p.issuer.Audience)
- if err != nil {
- if err == store.ErrNoAuthenticationData {
- return nil
- }
- return err
- }
- refresher, err := p.getRefresher(grant.Type)
- if err != nil {
- return err
- }
-
- source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience,
refresher)
+ source, err := cache.NewDefaultTokenCache(p.issuer.Audience, p.flow)
if err != nil {
return err
}
@@ -140,17 +116,6 @@ func (p *oauth2AuthProvider) Close() error {
return nil
}
-func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType)
(oauth2.AuthorizationGrantRefresher, error) {
- switch t {
- case oauth2.GrantTypeClientCredentials:
- return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
- case oauth2.GrantTypeDeviceCode:
- return
oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
- default:
- return nil, store.ErrUnsupportedAuthData
- }
-}
-
type transport struct {
source cache.CachingTokenSource
wrapped *xoauth2.Transport
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index cc3d11c1..ecaad51f 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -22,12 +22,18 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "sync/atomic"
"testing"
+ "github.com/apache/pulsar-client-go/oauth2"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
+var expectedClientID atomic.Value
+var expectedClientSecret atomic.Value
+
// mockOAuthServer will mock a oauth service for the tests
func mockOAuthServer() *httptest.Server {
// prepare a port for the mocked server
@@ -44,7 +50,17 @@ func mockOAuthServer() *httptest.Server {
}`, server.URL, server.URL, server.URL, server.URL)
fmt.Fprintln(writer, s)
})
- mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, _ *http.Request) {
+ mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, r *http.Request) {
+ if err := r.ParseForm(); err != nil {
+ http.Error(writer, "invalid form",
http.StatusBadRequest)
+ return
+ }
+ clientID := r.FormValue("client_id")
+ clientSecret := r.FormValue("client_secret")
+ if clientID != expectedClientID.Load().(string) || clientSecret
!= expectedClientSecret.Load().(string) {
+ http.Error(writer, "invalid client credentials",
http.StatusUnauthorized)
+ return
+ }
fmt.Fprintln(writer, "{\n \"access_token\":
\"token-content\",\n \"token_type\": \"Bearer\"\n}")
})
mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
@@ -72,7 +88,8 @@ func mockKeyFile(server string) (string, error) {
"client_id":"client-id",
"client_secret":"client-secret",
"client_email":"[email protected]",
- "issuer_url":"%s"
+ "issuer_url":"%s",
+ "scope": "test-scope"
}`, server))
if err != nil {
return "", err
@@ -84,6 +101,8 @@ func mockKeyFile(server string) (string, error) {
func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
server := mockOAuthServer()
defer server.Close()
+ expectedClientID.Store("client-id")
+ expectedClientSecret.Store("client-secret")
kf, err := mockKeyFile(server.URL)
defer os.Remove(kf)
if err != nil {
@@ -142,3 +161,73 @@ func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
assert.Equal(t, "token-content", string(token))
}
}
+
+func TestOAuth2KeyFileReloading(t *testing.T) {
+ server := mockOAuthServer()
+ defer server.Close()
+ expectedClientID.Store("client-id")
+ expectedClientSecret.Store("client-secret")
+ kf, err := mockKeyFile(server.URL)
+ defer os.Remove(kf)
+ require.NoError(t, err)
+
+ params := map[string]string{
+ ConfigParamType: ConfigParamTypeClientCredentials,
+ ConfigParamIssuerURL: server.URL,
+ ConfigParamClientID: "client-id",
+ ConfigParamAudience: "audience",
+ ConfigParamKeyFile: fmt.Sprintf("file://%s", kf),
+ ConfigParamScope: "profile",
+ }
+
+ auth, err := NewAuthenticationOAuth2WithParams(params)
+ require.NoError(t, err)
+ err = auth.Init()
+ require.NoError(t, err)
+
+ token, err := auth.GetData()
+ require.NoError(t, err)
+ assert.Equal(t, "token-content", string(token))
+
+ expectedClientSecret.Store("new-client-secret")
+ _, err = auth.GetData()
+ require.Error(t, err) // The token refresh should be failed after
updating the client-secret
+
+ // now update the key file to have different client credentials
+ keyFile, err := os.OpenFile(kf, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
0644)
+ require.NoError(t, err)
+ _, err = keyFile.WriteString(fmt.Sprintf(`{
+ "type":"resource",
+ "client_id":"client-id",
+ "client_secret":"new-client-secret",
+ "client_email":"[email protected]",
+ "issuer_url":"%s"
+}`, server.URL))
+ require.NoError(t, err)
+ require.NoError(t, keyFile.Close())
+
+ token, err = auth.GetData()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, "token-content", string(token))
+}
+
+func TestGrantProviderScopes(t *testing.T) {
+ expectedClientID.Store("client-id")
+ expectedClientSecret.Store("client-secret")
+ server := mockOAuthServer()
+ defer server.Close()
+ kf, err := mockKeyFile(server.URL)
+ defer os.Remove(kf)
+ require.NoError(t, err)
+
+ grantProvider := oauth2.DefaultGrantProvider{}
+ grant, err := grantProvider.GetGrant("test-audience",
&oauth2.ClientCredentialsFlowOptions{
+ KeyFile: kf,
+ AdditionalScopes: []string{"scope1", "scope2"},
+ })
+ require.NoError(t, err)
+
+ assert.Equal(t, []string{"scope1", "scope2", "test-scope"},
grant.Scopes)
+}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 59587abf..9536f09a 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -19,18 +19,11 @@ package auth
import (
"encoding/json"
- "fmt"
"net/http"
- "path/filepath"
- "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
-
- "github.com/99designs/keyring"
"github.com/apache/pulsar-client-go/oauth2"
"github.com/apache/pulsar-client-go/oauth2/cache"
clock2 "github.com/apache/pulsar-client-go/oauth2/clock"
- "github.com/apache/pulsar-client-go/oauth2/store"
- "github.com/pkg/errors"
xoauth2 "golang.org/x/oauth2"
)
@@ -50,25 +43,10 @@ type OAuth2ClientCredentials struct {
type OAuth2Provider struct {
clock clock2.RealClock
issuer oauth2.Issuer
- store store.Store
source cache.CachingTokenSource
defaultTransport http.RoundTripper
tokenTransport *transport
-}
-
-func NewAuthenticationOAuth2(issuer oauth2.Issuer, store store.Store)
(*OAuth2Provider, error) {
- p := &OAuth2Provider{
- clock: clock2.RealClock{},
- issuer: issuer,
- store: store,
- }
-
- err := p.loadGrant()
- if err != nil {
- return nil, err
- }
-
- return p, nil
+ flow *oauth2.ClientCredentialsFlow
}
// NewAuthenticationOAuth2WithDefaultFlow uses memory to save the grant
@@ -80,29 +58,18 @@ func NewAuthenticationOAuth2WithDefaultFlow(issuer
oauth2.Issuer, keyFile string
func NewAuthenticationOAuth2WithFlow(
issuer oauth2.Issuer, flowOptions oauth2.ClientCredentialsFlowOptions)
(Provider, error) {
- st := store.NewMemoryStore()
flow, err := oauth2.NewDefaultClientCredentialsFlow(flowOptions)
if err != nil {
return nil, err
}
- grant, err := flow.Authorize(issuer.Audience)
- if err != nil {
- return nil, err
- }
-
- err = st.SaveGrant(issuer.Audience, *grant)
- if err != nil {
- return nil, err
- }
-
p := &OAuth2Provider{
clock: clock2.RealClock{},
issuer: issuer,
- store: st,
+ flow: flow,
}
- return p, p.loadGrant()
+ return p, p.initCache()
}
func NewAuthenticationOAuth2FromAuthParams(encodedAuthParam string,
@@ -114,14 +81,14 @@ func
NewAuthenticationOAuth2FromAuthParams(encodedAuthParam string,
return nil, err
}
return NewAuthenticationOAuth2WithParams(paramsJSON.IssuerURL,
paramsJSON.ClientID, paramsJSON.Audience,
- paramsJSON.Scope, transport)
+ paramsJSON.PrivateKey, transport)
}
func NewAuthenticationOAuth2WithParams(
issuerEndpoint,
clientID,
audience string,
- _ string,
+ privateKey string,
transport http.RoundTripper) (*OAuth2Provider, error) {
issuer := oauth2.Issuer{
@@ -130,7 +97,7 @@ func NewAuthenticationOAuth2WithParams(
Audience: audience,
}
- keyringStore, err := MakeKeyringStore()
+ flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{KeyFile:
privateKey})
if err != nil {
return nil, err
}
@@ -138,11 +105,11 @@ func NewAuthenticationOAuth2WithParams(
p := &OAuth2Provider{
clock: clock2.RealClock{},
issuer: issuer,
- store: keyringStore,
defaultTransport: transport,
+ flow: flow,
}
- err = p.loadGrant()
+ err = p.initCache()
if err != nil {
return nil, err
}
@@ -150,24 +117,8 @@ func NewAuthenticationOAuth2WithParams(
return p, nil
}
-func (o *OAuth2Provider) loadGrant() error {
- grant, err := o.store.LoadGrant(o.issuer.Audience)
- if err != nil {
- if err == store.ErrNoAuthenticationData {
- return errors.New("oauth2 login required")
- }
- return err
- }
- return o.initCache(grant)
-}
-
-func (o *OAuth2Provider) initCache(grant *oauth2.AuthorizationGrant) error {
- refresher, err := o.getRefresher(grant.Type)
- if err != nil {
- return err
- }
-
- source, err := cache.NewDefaultTokenCache(o.store, o.issuer.Audience,
refresher)
+func (o *OAuth2Provider) initCache() error {
+ source, err := cache.NewDefaultTokenCache(o.issuer.Audience, o.flow)
if err != nil {
return err
}
@@ -194,17 +145,6 @@ func (o *OAuth2Provider) Transport() http.RoundTripper {
return o.tokenTransport
}
-func (o *OAuth2Provider) getRefresher(t oauth2.AuthorizationGrantType)
(oauth2.AuthorizationGrantRefresher, error) {
- switch t {
- case oauth2.GrantTypeClientCredentials:
- return oauth2.NewDefaultClientCredentialsGrantRefresher(o.clock)
- case oauth2.GrantTypeDeviceCode:
- return
oauth2.NewDefaultDeviceAuthorizationGrantRefresher(o.clock)
- default:
- return nil, store.ErrUnsupportedAuthData
- }
-}
-
type transport struct {
source cache.CachingTokenSource
wrapped *xoauth2.Transport
@@ -231,32 +171,3 @@ func (t *transport) RoundTrip(req *http.Request)
(*http.Response, error) {
}
func (t *transport) WrappedRoundTripper() http.RoundTripper { return
t.wrapped.Base }
-
-const (
- serviceName = "pulsar"
- keyChainName = "pulsarctl"
-)
-
-func MakeKeyringStore() (store.Store, error) {
- kr, err := makeKeyring()
- if err != nil {
- return nil, err
- }
- return store.NewKeyringStore(kr)
-}
-
-func makeKeyring() (keyring.Keyring, error) {
- return keyring.Open(keyring.Config{
- AllowedBackends: keyring.AvailableBackends(),
- ServiceName: serviceName,
- KeychainName: keyChainName,
- KeychainTrustApplication: true,
- FileDir: filepath.Join(fmt.Sprintf(
- "%s/.config/pulsar", utils.GetConfigPath()),
"credentials"),
- FilePasswordFunc: keyringPrompt,
- })
-}
-
-func keyringPrompt(_ string) (string, error) {
- return "", nil
-}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index b19133c7..65704ab8 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -25,7 +25,6 @@ import (
"testing"
"github.com/apache/pulsar-client-go/oauth2"
- "github.com/apache/pulsar-client-go/oauth2/store"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
@@ -99,37 +98,20 @@ func TestOauth2(t *testing.T) {
Audience: server.URL,
}
- memoryStore := store.NewMemoryStore()
- err = saveGrant(memoryStore, kf, issuer.Audience)
+ auth, err := NewAuthenticationOAuth2WithFlow(issuer,
oauth2.ClientCredentialsFlowOptions{
+ KeyFile: kf,
+ })
if err != nil {
t.Fatal(err)
}
- auth, err := NewAuthenticationOAuth2(issuer, memoryStore)
- if err != nil {
- t.Fatal(err)
+ provider, ok := auth.(*OAuth2Provider)
+ if !ok {
+ t.Fatal("unexpected provider type")
}
-
- token, err := auth.source.Token()
+ token, err := provider.source.Token()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "token-content", token.AccessToken)
}
-
-func saveGrant(store store.Store, keyFile, audience string) error {
- flow, err :=
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
- KeyFile: keyFile,
- AdditionalScopes: nil,
- })
- if err != nil {
- return err
- }
-
- grant, err := flow.Authorize(audience)
- if err != nil {
- return err
- }
-
- return store.SaveGrant(audience, *grant)
-}