GitHub user rsaleev edited a comment on the discussion: How to enable SSO login
in Superset using Keycloak access token?
Looks like OAUTH doesn't work now for 5.0. Approach that worked in 4.1.1
``python
class KeycloakOAuthView(AuthOAuthView):
@expose("/login/")
@expose("/login/<provider>")
def login(self, provider: Optional[str] = "keycloak") -> WerkzeugResponse:
if g.user is not None and g.user.is_authenticated:
session.pop("_flashes", None)
return redirect(self.appbuilder.get_url_for_index)
auth_header = request.headers.get(
"authorization", request.headers.get("Authorization")
)
if auth_header:
log.info("Authorization header provided")
_, token = auth_header.split("Bearer ")
# TODO: check request with header
log.info("Authorization with token")
if request.headers.get("X-Elevate"):
log.info("Elevating permissions")
self.appbuilder.sm.auth_with_elevate(provider, token)
else:
self.appbuilder.sm.auth_by_token(provider, token)
session.pop("_flashes", None)
return redirect(self.appbuilder.get_url_for_index)
session["oauth_state"] = os.environ["SUPERSET_SECRET_KEY"]
state = jwt.encode(
request.args.to_dict(flat=False),
session["oauth_state"],
algorithm="HS256",
)
try:
return
self.appbuilder.sm.oauth_remotes[provider].authorize_redirect(
redirect_uri=url_for(
".oauth_authorized", provider=provider, _external=True
),
state=state.decode("ascii") if isinstance(state, bytes) else
state,
)
except Exception as e:
log.error("Error on OAuth authorize: %s", e)
flash(as_unicode(self.invalid_login_message), "warning")
return redirect(self.appbuilder.get_url_for_index)
@expose("/oauth-authorized/<provider>")
def oauth_authorized(self, provider: str) -> WerkzeugResponse:
if provider not in self.appbuilder.sm.oauth_remotes:
flash("Provider not supported.", "warning")
log.warning("OAuth authorized got an unknown provider %s", provider)
return redirect(self.appbuilder.get_url_for_login)
try:
resp =
self.appbuilder.sm.oauth_remotes[provider].authorize_access_token()
except Exception as e:
log.error("Error authorizing OAuth access token: %s", e)
return redirect(self.appbuilder.get_url_for_login)
if resp is None:
return redirect(self.appbuilder.get_url_for_login)
session["id_token_hint"] = resp["id_token"]
try:
self.appbuilder.sm.set_oauth_session(provider, resp)
userinfo =
self.appbuilder.sm.get_userinfo_from_token(resp["access_token"])
except Exception as e:
log.error("Error returning OAuth user info: %s", e)
user = None
raise
else:
user = self.appbuilder.sm.auth_user_oauth(userinfo)
if user is None:
flash(as_unicode(self.invalid_login_message), "warning")
return redirect(self.appbuilder.get_url_for_login)
else:
try:
state = jwt.decode(
request.args["state"],
session["oauth_state"],
algorithms=["HS256"],
)
except (jwt.InvalidTokenError, KeyError):
flash(as_unicode("Invalid state signature"), "warning")
return redirect(self.appbuilder.get_url_for_login)
login_user(user)
next_url = self.appbuilder.get_url_for_index
# Check if there is a next url on state
if "next" in state and len(state["next"]) > 0:
next_url = get_safe_redirect(state["next"][0])
session.pop("_flashes", None)
return redirect(next_url)
@expose("/logout/")
def logout(self):
logout_user()
base_url =
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/logout?"
redirect_url = f"{request.host_url}{self.appbuilder.get_url_for_login}"
token_hint = f"id_token_hint={session['id_token_hint']}"
url = base_url + token_hint + "&" +
f"post_logout_redirect_uri={redirect_url}"
session.clear()
return redirect(url)
class KeycloakSecurityManager(SupersetSecurityManager):
def __init__(self, appbuilder):
super().__init__(appbuilder)
self.authoauthview = KeycloakOAuthView
def get_keycloak_public_key(self, token) -> str:
url =
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}"
resp = requests.get(
url, headers={"Authorization": f"Bearer {token}"}, verify=False
)
resp.raise_for_status()
data = resp.json()
return data["public_key"]
def decode_keycloak_jwt(self, token, verify: bool = False):
log.info("Decoding token...")
try:
key_der = b64decode(self.get_keycloak_public_key(token).encode())
public_key = serialization.load_der_public_key(key_der)
decoded = jwt.decode(
token,
key=public_key,
verify=verify,
options={"verify_aud": False},
algorithms=["RS256", "HS256"],
)
return decoded
except Exception as e:
log.exception(e)
raise
def get_userinfo_from_token(self, token):
decoded_token: dict = self.decode_keycloak_jwt(token)
roles = ["undefined"]
if realm_access := decoded_token.get("realm_access"):
if realm_roles := realm_access.get("roles"):
roles = realm_roles
if realm_user_roles := realm_access.get("userinfo_roles"):
roles = realm_user_roles
log.info("IdP user roles %s", ";".join(roles))
if len(roles) > 1:
roles = self.map_multiple_roles(roles)
log.info("Dedicated user roles after mapping %s", ";".join(roles))
fake_userinfo = FakeUserInfo.from_kwargs(**decoded_token)
# map keys
superset_userinfo = SupersetUserInfo(
first_name=fake_userinfo.given_name,
last_name=fake_userinfo.family_name,
username=fake_userinfo.preferred_username,
email=fake_userinfo.email,
role_keys=roles,
)
return asdict(superset_userinfo)
def map_multiple_roles(self, roles: list[str]) -> list[str]:
for i in range(0, len(AUTH_ROLES_PRIORITY.keys()) - 1):
if AUTH_ROLES_PRIORITY[i] in roles:
roles = [AUTH_ROLES_PRIORITY[i]]
break
return roles
def auth_by_token(self, provider: str, token: str):
userinfo = self.appbuilder.sm.get_userinfo_from_token(token)
fake_resp = {
"access_token": token,
"token_type": "Bearer",
"userinfo": userinfo,
}
self.appbuilder.sm.set_oauth_session(provider, fake_resp)
user = self.appbuilder.sm.auth_user_oauth(userinfo)
login_user(user, force=True)
def auth_with_elevate(self, provider, token):
user = self.find_user("SERVICE")
if not user:
user = self.add_user(
username="SERVICE",
first_name="Service",
last_name="Account",
role=list(self.get_roles_from_keys(["SERVICE"])),
email="[email protected]",
)
userinfo = self.appbuilder.sm.get_userinfo_from_token(token)
userinfo["username"] = user.username
userinfo["first_name"] = user.first_name
userinfo["last_name"] = user.last_name
userinfo["role_keys"] = ["ROLE"]
fake_resp = {
"access_token": token,
"token_type": "Bearer",
"userinfo": userinfo,
}
self.appbuilder.sm.set_oauth_session(provider, fake_resp)
user = self.appbuilder.sm.auth_user_oauth(userinfo)
login_user(user, force=True, duration=timedelta(minutes=10))
```
overridden config
```
AUTH_TYPE = AUTH_OAUTH
CUSTOM_SECURITY_MANAGER = KeycloakSecurityManager
OAUTH_PROVIDERS = [
{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {
"client_id": os.environ["OAUTH_CLIENT_ID"],
"client_secret": os.environ["OAUTH_CLIENT_SECRET"],
"client_kwargs": {"scope": "email profile openid", "verify": False},
"api_base_url":
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect",
"access_token_url":
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/token",
"authorize_url":
f"{os.environ.get('KEYCLOAK_BASE_URL')}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/auth",
"request_token_url": None,
"jwks_uri":
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/certs",
"server_metadata_url":
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/.well-known/openid-configuration",
},
}
]
```
Superset just uses DB_AUTH
GitHub link:
https://github.com/apache/superset/discussions/36203#discussioncomment-15124908
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]