bito-code-review[bot] commented on code in PR #38454:
URL: https://github.com/apache/superset/pull/38454#discussion_r2892265466
##########
superset/security/manager.py:
##########
@@ -1530,6 +1530,41 @@ def _is_public_pvm(self, pvm: PermissionView) -> bool:
# Exclude all other permissions not explicitly allowed
return False
+ def auth_user_oauth(self, userinfo: dict[str, Any]) -> Any:
+ """
+ Override FAB's auth_user_oauth to persist the upstream login token
when the
+ provider is configured with ``save_token: True`` in
``OAUTH_PROVIDERS``.
+
+ The token stored here can later be forwarded to database connections
via
+ ``DATABASE_OAUTH2_UPSTREAM_PROVIDERS``, avoiding a separate database
OAuth dance.
+ """
+ user = super().auth_user_oauth(userinfo)
+ if user:
+ provider = session.get("oauth_provider")
+ token = session.get("oauth")
+ if token and provider:
+ provider_configs: list[dict[str, Any]] =
current_app.config.get(
+ "OAUTH_PROVIDERS", []
+ )
+ provider_config = next(
+ (p for p in provider_configs if p.get("name") == provider),
+ None,
+ )
+ if provider_config and provider_config.get("save_token"):
+ # pylint: disable=import-outside-toplevel
+ from superset.utils.oauth2 import save_user_provider_token
+
+ try:
+ save_user_provider_token(user.id, provider, token)
+ except Exception: # pylint: disable=broad-except
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>Blind exception catch too broad</b></div>
<div id="fix">
Replace the broad `Exception` catch with specific exception types. Consider
catching `Exception` but add a comment explaining why, or catch more specific
exceptions like `ValueError` or `KeyError`.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
except (KeyError, AttributeError, TypeError): #
Expected exceptions from config access
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
##########
superset/utils/oauth2.py:
##########
@@ -176,6 +176,171 @@ def refresh_oauth2_token(
return token.access_token
+def save_user_provider_token(
+ user_id: int,
+ provider: str,
+ token_response: dict[str, Any],
+) -> None:
+ """
+ Upsert an UpstreamOAuthToken record for the given user and login provider.
+
+ Called after a successful Superset OAuth login when the provider has
+ ``save_token: True`` set in ``OAUTH_PROVIDERS``.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ token = UpstreamOAuthToken(user_id=user_id, provider=provider)
+
+ token.access_token = token_response.get("access_token")
+ expires_in = token_response.get("expires_in")
+ if expires_in is not None:
+ token.access_token_expiration = datetime.now() + timedelta(
+ seconds=int(expires_in)
+ )
+ else:
+ token.access_token_expiration = None
+ token.refresh_token = token_response.get("refresh_token")
+
+ db.session.add(token)
+ db.session.commit()
+
+
+def get_upstream_provider_token(provider: str, user_id: int) -> str | None:
+ """
+ Return a valid access token stored for the given login provider and user.
+
+ If the stored token is expired and a refresh token is available, the token
+ is refreshed and the new access token is returned. Returns ``None`` when
no
+ token is found or when the token cannot be refreshed.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token: UpstreamOAuthToken | None = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ return None
+
+ if token.access_token and (
+ token.access_token_expiration is None
+ or datetime.now() < token.access_token_expiration
+ ):
+ return token.access_token
+
+ if token.refresh_token:
+ return _refresh_upstream_provider_token(token, provider)
+
+ # Expired with no refresh token — discard stale record
+ db.session.delete(token)
+ db.session.commit()
+ return None
+
+
+def _refresh_upstream_provider_token(
+ token: "UpstreamOAuthToken",
+ provider: str,
+) -> str | None:
+ """
+ Refresh an upstream provider token using the stored refresh token.
+
+ Looks up the provider's token endpoint via Flask-AppBuilder's remote app
+ registry, issues a refresh-grant request, persists the new token, and
+ returns the new access token (or ``None`` if the refresh fails).
+ """
+ import requests # pylint: disable=import-outside-toplevel
+
+ provider_configs: list[dict[str, Any]] = app.config.get("OAUTH_PROVIDERS",
[])
+ provider_config = next(
+ (p for p in provider_configs if p.get("name") == provider), None
+ )
+ if not provider_config:
+ logger.warning(
+ "Cannot refresh upstream token: provider '%s' not found in
OAUTH_PROVIDERS",
+ provider,
+ )
+ return None
+
+ # Retrieve the token endpoint from the registered remote app's server
metadata
+ from flask_appbuilder import current_app as fab_app # pylint:
disable=import-outside-toplevel
+
+ sm = fab_app.appbuilder.sm
+ remote_app = getattr(sm, "oauth_remoteapp", {}).get(provider)
+ if remote_app is None:
+ logger.warning(
+ "Cannot refresh upstream token: remote app '%s' not registered",
provider
+ )
+ return None
+
+ try:
+ server_metadata = remote_app.load_server_metadata()
+ token_endpoint = server_metadata.get("token_endpoint")
+ except Exception: # pylint: disable=broad-except
+ logger.warning(
+ "Cannot refresh upstream token: failed to load server metadata for
'%s'",
+ provider,
+ exc_info=True,
+ )
+ return None
+
+ if not token_endpoint:
+ logger.warning(
+ "Cannot refresh upstream token: no token_endpoint in metadata for
'%s'",
+ provider,
+ )
+ return None
+
+ client_id = provider_config.get("remote_app", {}).get("client_id")
+ client_secret = provider_config.get("remote_app", {}).get("client_secret")
+
+ try:
+ resp = requests.post( # noqa: S113
+ token_endpoint,
+ data={
+ "grant_type": "refresh_token",
+ "refresh_token": token.refresh_token,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ },
+ timeout=app.config.get("DATABASE_OAUTH2_TIMEOUT",
timedelta(seconds=30)).total_seconds(),
+ )
+ resp.raise_for_status()
+ token_response = resp.json()
+ except Exception: # pylint: disable=broad-except
+ logger.warning(
+ "Failed to refresh upstream OAuth token for provider '%s'",
+ provider,
+ exc_info=True,
+ )
+ return None
+
+ if "access_token" not in token_response:
+ return None
+
+ token.access_token = token_response["access_token"]
+ expires_in = token_response.get("expires_in")
+ if expires_in is not None:
+ token.access_token_expiration = datetime.now() + timedelta(
+ seconds=int(expires_in)
+ )
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>datetime.now() called without timezone</b></div>
<div id="fix">
Multiple instances of `datetime.now()` called without timezone argument
(lines 204, 236, 332). Use `datetime.now(tz=timezone.utc)` for timezone-aware
datetime objects.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
expires_in = token_response.get("expires_in")
if expires_in is not None:
token.access_token_expiration = datetime.now(tz=timezone.utc) +
timedelta(
seconds=int(expires_in),
)
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
##########
superset/utils/oauth2.py:
##########
@@ -176,6 +176,171 @@ def refresh_oauth2_token(
return token.access_token
+def save_user_provider_token(
+ user_id: int,
+ provider: str,
+ token_response: dict[str, Any],
+) -> None:
+ """
+ Upsert an UpstreamOAuthToken record for the given user and login provider.
+
+ Called after a successful Superset OAuth login when the provider has
+ ``save_token: True`` set in ``OAUTH_PROVIDERS``.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ token = UpstreamOAuthToken(user_id=user_id, provider=provider)
+
+ token.access_token = token_response.get("access_token")
+ expires_in = token_response.get("expires_in")
+ if expires_in is not None:
+ token.access_token_expiration = datetime.now() + timedelta(
+ seconds=int(expires_in)
+ )
+ else:
+ token.access_token_expiration = None
+ token.refresh_token = token_response.get("refresh_token")
+
+ db.session.add(token)
+ db.session.commit()
+
+
+def get_upstream_provider_token(provider: str, user_id: int) -> str | None:
+ """
+ Return a valid access token stored for the given login provider and user.
+
+ If the stored token is expired and a refresh token is available, the token
+ is refreshed and the new access token is returned. Returns ``None`` when
no
+ token is found or when the token cannot be refreshed.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token: UpstreamOAuthToken | None = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ return None
+
+ if token.access_token and (
+ token.access_token_expiration is None
+ or datetime.now() < token.access_token_expiration
+ ):
+ return token.access_token
+
+ if token.refresh_token:
+ return _refresh_upstream_provider_token(token, provider)
+
+ # Expired with no refresh token — discard stale record
+ db.session.delete(token)
+ db.session.commit()
+ return None
+
+
+def _refresh_upstream_provider_token(
+ token: "UpstreamOAuthToken",
+ provider: str,
+) -> str | None:
+ """
+ Refresh an upstream provider token using the stored refresh token.
+
+ Looks up the provider's token endpoint via Flask-AppBuilder's remote app
+ registry, issues a refresh-grant request, persists the new token, and
+ returns the new access token (or ``None`` if the refresh fails).
+ """
+ import requests # pylint: disable=import-outside-toplevel
+
+ provider_configs: list[dict[str, Any]] = app.config.get("OAUTH_PROVIDERS",
[])
+ provider_config = next(
+ (p for p in provider_configs if p.get("name") == provider), None
+ )
+ if not provider_config:
+ logger.warning(
+ "Cannot refresh upstream token: provider '%s' not found in
OAUTH_PROVIDERS",
+ provider,
+ )
+ return None
+
+ # Retrieve the token endpoint from the registered remote app's server
metadata
+ from flask_appbuilder import current_app as fab_app # pylint:
disable=import-outside-toplevel
+
+ sm = fab_app.appbuilder.sm
+ remote_app = getattr(sm, "oauth_remoteapp", {}).get(provider)
+ if remote_app is None:
+ logger.warning(
+ "Cannot refresh upstream token: remote app '%s' not registered",
provider
+ )
+ return None
+
+ try:
+ server_metadata = remote_app.load_server_metadata()
+ token_endpoint = server_metadata.get("token_endpoint")
+ except Exception: # pylint: disable=broad-except
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>Blind exception catch without specificity</b></div>
<div id="fix">
Catching bare `Exception` is too broad. Specify the expected exception
type(s) to catch only relevant errors.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
try:
server_metadata = remote_app.load_server_metadata()
token_endpoint = server_metadata.get("token_endpoint")
except (AttributeError, KeyError, ValueError):
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
##########
superset/utils/oauth2.py:
##########
@@ -176,6 +176,171 @@ def refresh_oauth2_token(
return token.access_token
+def save_user_provider_token(
+ user_id: int,
+ provider: str,
+ token_response: dict[str, Any],
+) -> None:
+ """
+ Upsert an UpstreamOAuthToken record for the given user and login provider.
+
+ Called after a successful Superset OAuth login when the provider has
+ ``save_token: True`` set in ``OAUTH_PROVIDERS``.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ token = UpstreamOAuthToken(user_id=user_id, provider=provider)
+
+ token.access_token = token_response.get("access_token")
+ expires_in = token_response.get("expires_in")
+ if expires_in is not None:
+ token.access_token_expiration = datetime.now() + timedelta(
+ seconds=int(expires_in)
+ )
+ else:
+ token.access_token_expiration = None
+ token.refresh_token = token_response.get("refresh_token")
+
+ db.session.add(token)
+ db.session.commit()
+
+
+def get_upstream_provider_token(provider: str, user_id: int) -> str | None:
+ """
+ Return a valid access token stored for the given login provider and user.
+
+ If the stored token is expired and a refresh token is available, the token
+ is refreshed and the new access token is returned. Returns ``None`` when
no
+ token is found or when the token cannot be refreshed.
+ """
+ # pylint: disable=import-outside-toplevel
+ from superset.models.core import UpstreamOAuthToken
+
+ token: UpstreamOAuthToken | None = (
+ db.session.query(UpstreamOAuthToken)
+ .filter_by(user_id=user_id, provider=provider)
+ .one_or_none()
+ )
+ if token is None:
+ return None
+
+ if token.access_token and (
+ token.access_token_expiration is None
+ or datetime.now() < token.access_token_expiration
+ ):
+ return token.access_token
+
+ if token.refresh_token:
+ return _refresh_upstream_provider_token(token, provider)
+
+ # Expired with no refresh token — discard stale record
+ db.session.delete(token)
+ db.session.commit()
+ return None
+
+
+def _refresh_upstream_provider_token(
+ token: "UpstreamOAuthToken",
+ provider: str,
+) -> str | None:
+ """
+ Refresh an upstream provider token using the stored refresh token.
+
+ Looks up the provider's token endpoint via Flask-AppBuilder's remote app
+ registry, issues a refresh-grant request, persists the new token, and
+ returns the new access token (or ``None`` if the refresh fails).
+ """
+ import requests # pylint: disable=import-outside-toplevel
+
+ provider_configs: list[dict[str, Any]] = app.config.get("OAUTH_PROVIDERS",
[])
+ provider_config = next(
+ (p for p in provider_configs if p.get("name") == provider), None
+ )
+ if not provider_config:
+ logger.warning(
+ "Cannot refresh upstream token: provider '%s' not found in
OAUTH_PROVIDERS",
+ provider,
+ )
+ return None
+
+ # Retrieve the token endpoint from the registered remote app's server
metadata
+ from flask_appbuilder import current_app as fab_app # pylint:
disable=import-outside-toplevel
+
+ sm = fab_app.appbuilder.sm
+ remote_app = getattr(sm, "oauth_remoteapp", {}).get(provider)
+ if remote_app is None:
+ logger.warning(
+ "Cannot refresh upstream token: remote app '%s' not registered",
provider
+ )
+ return None
+
+ try:
+ server_metadata = remote_app.load_server_metadata()
+ token_endpoint = server_metadata.get("token_endpoint")
+ except Exception: # pylint: disable=broad-except
+ logger.warning(
+ "Cannot refresh upstream token: failed to load server metadata for
'%s'",
+ provider,
+ exc_info=True,
+ )
+ return None
+
+ if not token_endpoint:
+ logger.warning(
+ "Cannot refresh upstream token: no token_endpoint in metadata for
'%s'",
+ provider,
+ )
+ return None
+
+ client_id = provider_config.get("remote_app", {}).get("client_id")
+ client_secret = provider_config.get("remote_app", {}).get("client_secret")
+
+ try:
+ resp = requests.post( # noqa: S113
+ token_endpoint,
+ data={
+ "grant_type": "refresh_token",
+ "refresh_token": token.refresh_token,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ },
+ timeout=app.config.get("DATABASE_OAUTH2_TIMEOUT",
timedelta(seconds=30)).total_seconds(),
+ )
+ resp.raise_for_status()
+ token_response = resp.json()
+ except Exception: # pylint: disable=broad-except
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>Blind exception catch without specificity</b></div>
<div id="fix">
Catching bare `Exception` is too broad. Specify the expected exception
type(s) to catch only relevant errors from the requests call.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
resp.raise_for_status()
token_response = resp.json()
except requests.RequestException:
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
##########
tests/unit_tests/utils/oauth2_tests.py:
##########
@@ -335,3 +337,112 @@ def test_encode_decode_oauth2_state(
assert "code_verifier" not in decoded
assert decoded["database_id"] == 1
assert decoded["user_id"] == 2
+
+
+# ---------- upstream provider token tests ----------
+
+
+def test_save_user_provider_token_creates_new(mocker: MockerFixture) -> None:
+ """
+ save_user_provider_token should create a new UpstreamOAuthToken when none
exists.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ mock_token = mocker.MagicMock()
+ mocker.patch("superset.models.core.UpstreamOAuthToken",
return_value=mock_token)
+ db_mock.session.query().filter_by().one_or_none.return_value = None
+
+ token_response = {
+ "access_token": "tok123",
+ "expires_in": 3600,
+ "refresh_token": "ref456",
+ }
+ save_user_provider_token(user_id=1, provider="keycloak",
token_response=token_response)
+
+ db_mock.session.add.assert_called_once()
+ db_mock.session.commit.assert_called_once()
+
+
+def test_save_user_provider_token_updates_existing(mocker: MockerFixture) ->
None:
+ """
+ save_user_provider_token should update an existing UpstreamOAuthToken
in-place.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ existing = mocker.MagicMock()
+ db_mock.session.query().filter_by().one_or_none.return_value = existing
+
+ token_response = {
+ "access_token": "new_tok",
+ "expires_in": 1800,
+ "refresh_token": "new_ref",
+ }
+ save_user_provider_token(user_id=2, provider="keycloak",
token_response=token_response)
+
+ assert existing.access_token == "new_tok"
+ assert existing.refresh_token == "new_ref"
+ db_mock.session.add.assert_called_once_with(existing)
+ db_mock.session.commit.assert_called_once()
+
+
+def test_get_upstream_provider_token_no_record(mocker: MockerFixture) -> None:
+ """
+ get_upstream_provider_token returns None when no token record exists.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ db_mock.session.query().filter_by().one_or_none.return_value = None
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+ assert result is None
+
+
+def test_get_upstream_provider_token_valid(mocker: MockerFixture) -> None:
+ """
+ get_upstream_provider_token returns the access token when it is still
valid.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "valid_token" # noqa: S105
+ token.access_token_expiration = datetime(2099, 1, 1)
+ db_mock.session.query().filter_by().one_or_none.return_value = token
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+ assert result == "valid_token"
+
+
+def test_get_upstream_provider_token_expired_no_refresh(mocker: MockerFixture)
-> None:
+ """
+ get_upstream_provider_token deletes an expired token with no refresh token
and returns None.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "expired" # noqa: S105
+ token.access_token_expiration = datetime(2000, 1, 1)
+ token.refresh_token = None
+ db_mock.session.query().filter_by().one_or_none.return_value = token
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+
+ assert result is None
+ db_mock.session.delete.assert_called_once_with(token)
+ db_mock.session.commit.assert_called_once()
+
+
+def test_get_upstream_provider_token_expired_calls_refresh(mocker:
MockerFixture) -> None:
+ """
+ get_upstream_provider_token delegates to refresh when token is expired but
has refresh token.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "expired" # noqa: S105
+ token.access_token_expiration = datetime(2000, 1, 1)
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>Datetime without timezone info</b></div>
<div id="fix">
Datetime object created without `tzinfo` argument. Add timezone information
using `datetime.timezone.utc` or similar.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
db_mock = mocker.patch("superset.utils.oauth2.db")
token = mocker.MagicMock()
token.access_token = "expired" # noqa: S105
token.access_token_expiration = datetime(2000, 1, 1, tzinfo=timezone.utc)
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
##########
tests/unit_tests/utils/oauth2_tests.py:
##########
@@ -335,3 +337,112 @@ def test_encode_decode_oauth2_state(
assert "code_verifier" not in decoded
assert decoded["database_id"] == 1
assert decoded["user_id"] == 2
+
+
+# ---------- upstream provider token tests ----------
+
+
+def test_save_user_provider_token_creates_new(mocker: MockerFixture) -> None:
+ """
+ save_user_provider_token should create a new UpstreamOAuthToken when none
exists.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ mock_token = mocker.MagicMock()
+ mocker.patch("superset.models.core.UpstreamOAuthToken",
return_value=mock_token)
+ db_mock.session.query().filter_by().one_or_none.return_value = None
+
+ token_response = {
+ "access_token": "tok123",
+ "expires_in": 3600,
+ "refresh_token": "ref456",
+ }
+ save_user_provider_token(user_id=1, provider="keycloak",
token_response=token_response)
+
+ db_mock.session.add.assert_called_once()
+ db_mock.session.commit.assert_called_once()
+
+
+def test_save_user_provider_token_updates_existing(mocker: MockerFixture) ->
None:
+ """
+ save_user_provider_token should update an existing UpstreamOAuthToken
in-place.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ existing = mocker.MagicMock()
+ db_mock.session.query().filter_by().one_or_none.return_value = existing
+
+ token_response = {
+ "access_token": "new_tok",
+ "expires_in": 1800,
+ "refresh_token": "new_ref",
+ }
+ save_user_provider_token(user_id=2, provider="keycloak",
token_response=token_response)
+
+ assert existing.access_token == "new_tok"
+ assert existing.refresh_token == "new_ref"
+ db_mock.session.add.assert_called_once_with(existing)
+ db_mock.session.commit.assert_called_once()
+
+
+def test_get_upstream_provider_token_no_record(mocker: MockerFixture) -> None:
+ """
+ get_upstream_provider_token returns None when no token record exists.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ db_mock.session.query().filter_by().one_or_none.return_value = None
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+ assert result is None
+
+
+def test_get_upstream_provider_token_valid(mocker: MockerFixture) -> None:
+ """
+ get_upstream_provider_token returns the access token when it is still
valid.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "valid_token" # noqa: S105
+ token.access_token_expiration = datetime(2099, 1, 1)
+ db_mock.session.query().filter_by().one_or_none.return_value = token
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+ assert result == "valid_token"
+
+
+def test_get_upstream_provider_token_expired_no_refresh(mocker: MockerFixture)
-> None:
+ """
+ get_upstream_provider_token deletes an expired token with no refresh token
and returns None.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "expired" # noqa: S105
+ token.access_token_expiration = datetime(2000, 1, 1)
+ token.refresh_token = None
+ db_mock.session.query().filter_by().one_or_none.return_value = token
+
+ result = get_upstream_provider_token(provider="keycloak", user_id=1)
+
+ assert result is None
+ db_mock.session.delete.assert_called_once_with(token)
+ db_mock.session.commit.assert_called_once()
+
+
+def test_get_upstream_provider_token_expired_calls_refresh(mocker:
MockerFixture) -> None:
+ """
+ get_upstream_provider_token delegates to refresh when token is expired but
has refresh token.
+ """
+ db_mock = mocker.patch("superset.utils.oauth2.db")
+ token = mocker.MagicMock()
+ token.access_token = "expired" # noqa: S105
+ token.access_token_expiration = datetime(2000, 1, 1)
Review Comment:
<div>
<div id="suggestion">
<div id="issue"><b>Datetime without timezone info</b></div>
<div id="fix">
Datetime object created without `tzinfo` argument. Add timezone information
using `datetime.timezone.utc` or similar.
</div>
<details>
<summary>
<b>Code suggestion</b>
</summary>
<blockquote>Check the AI-generated fix before applying</blockquote>
<div id="code">
````suggestion
db_mock = mocker.patch("superset.utils.oauth2.db")
token = mocker.MagicMock()
token.access_token = "expired" # noqa: S105
token.access_token_expiration = datetime(2000, 1, 1, tzinfo=timezone.utc)
````
</div>
</details>
</div>
<small><i>Code Review Run #3fe63a</i></small>
</div>
---
Should Bito avoid suggestions like this for future reviews? (<a
href=https://alpha.bito.ai/home/ai-agents/review-rules>Manage Rules</a>)
- [ ] Yes, avoid them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]