sfc-gh-tbenroeck opened a new issue, #418:
URL: https://github.com/apache/polaris/issues/418
### Is this a possible security vulnerability?
- [X] This is NOT a possible security vulnerability
### Describe the bug
The adls.sas-token key:value being emitted is
adls.sas-token.{account_host}:{sas-token} which works in Spark client but not
other clients like PyIceberg
[#1146](https://github.com/apache/iceberg-python/issues/1146#issuecomment-2421306564).
To support more clients Polaris should also set:
- adls.sas-token (without the account-host)
- adls.account-name
- adls.account_host
### To Reproduce
1. Use PyIceberg
```python
from pyiceberg.catalog import load_catalog
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
catalog = load_catalog(
**{
"type": "rest",
"header.X-Iceberg-Access-Delegation": "vended-credentials",
"uri":
f"https://{account}.snowflakecomputing.com/polaris/api/catalog",
"credential": f"{principal_client_id}:{principal_secret}",
"warehouse": catalog_name,
"scope": role,
"token-refresh-enabled": "true",
"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
}
)
table =
catalog.load_table(f"{catalog_namespace}.{catalog_namespace_tablename}")
tablescan = table.scan()
df = tablescan.to_arrow()
```
```bash
Traceback (most recent call last):
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 375, in _fetch_access_token
response.raise_for_status()
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/requests/models.py",
line 1024, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url:
https://tv23016.west-us-2.azure.snowflakecomputing.com/polaris/api/catalog/v1/oauth/tokens
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tbenroeck/Documents/code/polaris_testing/simple_polaris.py",
line 21, in <module>
catalog = load_catalog(
^^^^^^^^^^^^^
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/__init__.py",
line 261, in load_catalog
return AVAILABLE_CATALOGS[catalog_type](name, cast(Dict[str, str], conf))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/__init__.py",
line 136, in load_rest
return RestCatalog(name, **conf)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 263, in __init__
self._fetch_config()
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 386, in _fetch_config
with self._create_session() as session:
^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 280, in _create_session
self._refresh_token(session, self.properties.get(TOKEN))
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 546, in _refresh_token
self.properties[TOKEN] = self._fetch_access_token(session,
self.properties[CREDENTIAL])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 377, in _fetch_access_token
self._handle_non_200_response(exc, {400: OAuthError, 401: OAuthError})
File
"/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py",
line 471, in _handle_non_200_response
raise exception(response) from exc
pyiceberg.exceptions.OAuthError: unauthorized_client: The client is not
authorized
```
### Actual Behavior
_No response_
### Expected Behavior
_No response_
### Additional context
I created a custom FileIO fix as a temporary workaround
```
catalog = load_catalog(
**{
"type": "rest",
"header.X-Iceberg-Access-Delegation": "vended-credentials",
"uri":
f"https://{account}.snowflakecomputing.com/polaris/api/catalog",
"credential": f"{principal_client_id}:{principal_secret}",
"warehouse": catalog_name,
"scope": role,
"token-refresh-enabled": "true",
"py-io-impl": "custom_fsspec.CustomFsspecFileIO",
}
)
```
```python
from pyiceberg.io.fsspec import FsspecFileIO, _adls
from urllib.parse import urlparse
from pyiceberg.io import (ADLS_ACCOUNT_NAME,ADLS_SAS_TOKEN,
ADLFS_ACCOUNT_NAME, ADLFS_SAS_TOKEN)
from pyiceberg.utils.properties import get_first_property_value
from fsspec import AbstractFileSystem
from pyiceberg.typedef import Properties
class CustomFsspecFileIO(FsspecFileIO):
def __init__(self, properties):
# Short term fix for
https://github.com/apache/iceberg-python/issues/961 and
https://github.com/apache/iceberg-python/issues/1146
base_location = properties.get('default-base-location')
if base_location and base_location.startswith('abfs'):
account_name =
get_first_property_value(properties,ADLS_ACCOUNT_NAME,ADLFS_ACCOUNT_NAME)
sas_token =
get_first_property_value(properties,ADLS_SAS_TOKEN,ADLFS_SAS_TOKEN)
if sas_token is None:
for key, value in properties.items():
key = key.replace('adlfs.', 'adls.')
if key.startswith(ADLS_SAS_TOKEN):
properties[ADLS_SAS_TOKEN] = value
if key.endswith('.windows.net'):
if account_name is None:
account_host =
key.removeprefix(f"{ADLS_SAS_TOKEN}.")
account_name = account_host.split('.')[0]
properties[ADLS_ACCOUNT_NAME] = account_name
properties['adls.account-host'] =
account_host
break # Exit loop after finding the first match
super().__init__(properties)
def _get_fs(self, scheme: str):
if scheme in ["abfs", "abfss", "wasb", "wasbs"]:
if scheme in ["wasb"]:
scheme = 'abfs'
if scheme in ["wasbs"]:
scheme = 'abfss'
adls_fs = _adls(self.properties)
return adls_fs
# If not adls proceed with the original behavior
return super()._get_fs(scheme)
def new_input(self, location: str):
# Replace wasb(s):// with adfs(s):// in the location
uri = urlparse(location)
if uri.scheme in ["wasb"]:
location = location.replace(f"{uri.scheme}://", "abfs://")
if uri.scheme in ["wasbs"]:
location = location.replace(f"{uri.scheme}://", "abfss://")
return super().new_input(location)
def new_output(self, location: str):
# Replace wasb(s):// with adfs:// in the location
uri = urlparse(location)
if uri.scheme in ["wasb"]:
location = location.replace(f"{uri.scheme}://", "abfs://")
if uri.scheme in ["wasbs"]:
location = location.replace(f"{uri.scheme}://", "abfss://")
return super().new_output(location)
def _adls(properties: Properties) -> AbstractFileSystem:
from adlfs import AzureBlobFileSystem
return AzureBlobFileSystem(
account_host = properties['adls.account-host'],
account_name=properties[ADLS_ACCOUNT_NAME],
sas_token=properties[ADLS_SAS_TOKEN]
)
```
### System information
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]