Copilot commented on code in PR #38174:
URL: https://github.com/apache/superset/pull/38174#discussion_r2885439939
##########
superset/sql/parse.py:
##########
@@ -321,6 +321,37 @@ def qualify(
)
+@dataclass(eq=True, frozen=True)
+class Partition:
+ """
+ Partition object, with two attribute keys:
+ ispartitioned_table and partition_comlumn,
+ used to provide partition information
+ Here is an example of an object:
+ {"ispartitioned_table":true,"partition_column":["month","day"]}
+ """
+
+ is_partitioned_table: bool
+ partition_column: list[str] | None = None
+
+ def __str__(self) -> str:
+ """
+ Return the partition columns of table name.
Review Comment:
The `__str__` docstring says "Return the partition columns of table name"
which is inaccurate — the method returns a string representation of the entire
`Partition` object, not just the partition columns.
```suggestion
Return a string representation of the Partition object, including
the partition flag and partition columns.
```
##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+ TableMetadataColumnsResponse,
+ TableMetadataResponse,
+)
+from superset.databases.utils import (
+ get_col_type,
+ get_foreign_keys_metadata,
+ get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+ @classmethod
+ def get_table_metadata(
+ cls,
+ database: Database,
+ table: Table,
+ partition: Optional[Partition] = None,
+ ) -> TableMetadataResponse:
+ """
+ Returns basic table metadata
+ :param database: Database instance
+ :param table: A Table instance
+ :param partition: A Table partition info
+ :return: Basic table metadata
+ """
+ return cls.get_table_metadata(database, table, partition)
Review Comment:
The `OdpsBaseEngineSpec.get_table_metadata` method at line 59 calls itself
recursively via `cls.get_table_metadata(database, table, partition)`. This
causes infinite recursion and a `RecursionError` whenever this method is
invoked directly on `OdpsBaseEngineSpec`. Since the method is meant to be
overridden by subclasses (and `OdpsEngineSpec` does override it), the intent
seems to be to call the parent's method, but the implementation instead calls
itself. If the design goal is to delegate to the parent class implementation
(i.e., `BaseEngineSpec.get_table_metadata`), it should call
`super().get_table_metadata(database, table)`, though that method doesn't
accept a `partition` argument. If `OdpsBaseEngineSpec` is purely abstract and
`get_table_metadata` should never be called on it directly, the method body
should be removed entirely or replaced with `raise NotImplementedError`.
```suggestion
raise NotImplementedError(
"OdpsBaseEngineSpec.get_table_metadata must be overridden"
)
```
##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+ TableMetadataColumnsResponse,
+ TableMetadataResponse,
+)
+from superset.databases.utils import (
+ get_col_type,
+ get_foreign_keys_metadata,
+ get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+ @classmethod
+ def get_table_metadata(
+ cls,
+ database: Database,
+ table: Table,
+ partition: Optional[Partition] = None,
+ ) -> TableMetadataResponse:
+ """
+ Returns basic table metadata
+ :param database: Database instance
+ :param table: A Table instance
+ :param partition: A Table partition info
+ :return: Basic table metadata
+ """
+ return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+ engine = "odps"
+ engine_name = "ODPS (MaxCompute)"
+ default_driver = "odps"
+
+ @classmethod
+ def get_table_metadata(
+ cls, database: Any, table: Table, partition: Optional[Partition] = None
+ ) -> TableMetadataResponse:
+ """
+ Get table metadata information, including type, pk, fks.
+ This function raises SQLAlchemyError when a schema is not found.
+
+ :param partition: The table's partition info
+ :param database: The database model
+ :param table: Table instance
+ :return: Dict table metadata ready for API response
+ """
+ keys = []
+ columns = database.get_columns(table)
+ primary_key = database.get_pk_constraint(table)
+ if primary_key and primary_key.get("constrained_columns"):
+ primary_key["column_names"] =
primary_key.pop("constrained_columns")
+ primary_key["type"] = "pk"
+ keys += [primary_key]
+ foreign_keys = get_foreign_keys_metadata(database, table)
+ indexes = get_indexes_metadata(database, table)
+ keys += foreign_keys + indexes
+ payload_columns: list[TableMetadataColumnsResponse] = []
+ table_comment = database.get_table_comment(table)
+ for col in columns:
+ dtype = get_col_type(col)
+ payload_columns.append(
+ {
+ "name": col["column_name"],
+ "type": dtype.split("(")[0] if "(" in dtype else dtype,
+ "longType": dtype,
+ "keys": [
+ k for k in keys if col["column_name"] in
k["column_names"]
+ ],
+ "comment": col.get("comment"),
+ }
+ )
+
+ with database.get_sqla_engine(
+ catalog=table.catalog, schema=table.schema
+ ) as engine:
+ return {
+ "name": table.table,
+ "columns": payload_columns,
+ "selectStar": cls.select_star(
+ database=database,
+ table=table,
+ engine=engine,
+ limit=100,
+ show_cols=False,
+ indent=True,
+ latest_partition=True,
+ cols=columns,
+ partition=partition,
+ ),
+ "primaryKey": primary_key,
+ "foreignKeys": foreign_keys,
+ "indexes": keys,
+ "comment": table_comment,
+ }
+
+ @classmethod
+ def select_star( # pylint: disable=too-many-arguments
+ cls,
+ database: Database,
+ table: Table,
+ engine: Engine,
+ limit: int = 100,
+ show_cols: bool = False,
+ indent: bool = True,
+ latest_partition: bool = True,
+ cols: list[ResultSetColumnType] | None = None,
+ partition: Optional[Partition] = None,
+ ) -> str:
Review Comment:
The `select_star` method in `OdpsEngineSpec` has a different signature than
all other engine specs' `select_star` methods (including the base class). The
base class, Presto, Hive, and BigQuery all use `dialect: Dialect` as the third
parameter, but `OdpsEngineSpec.select_star` uses `engine: Engine`. This breaks
the Liskov Substitution Principle and will cause issues if `select_star` is
ever called polymorphically through the engine spec interface (e.g.,
`cls.select_star(database, table, dialect, ...)` from a caller using the
standard interface). The method should use `dialect: Dialect` for consistency
and call `cls.quote_table(table, dialect)` instead of `cls.quote_table(table,
engine.dialect)`, or it should accept `**kwargs` to maintain compatibility.
##########
superset/sql/parse.py:
##########
@@ -321,6 +321,37 @@ def qualify(
)
+@dataclass(eq=True, frozen=True)
+class Partition:
+ """
+ Partition object, with two attribute keys:
+ ispartitioned_table and partition_comlumn,
+ used to provide partition information
+ Here is an example of an object:
+ {"ispartitioned_table":true,"partition_column":["month","day"]}
Review Comment:
The docstring for the `Partition` class contains two spelling errors:
"ispartitioned_table" should be "is_partitioned_table", and "partition_comlumn"
should be "partition_column".
```suggestion
is_partitioned_table and partition_column,
used to provide partition information
Here is an example of an object:
{"is_partitioned_table": true, "partition_column": ["month", "day"]}
```
##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+ TableMetadataColumnsResponse,
+ TableMetadataResponse,
+)
+from superset.databases.utils import (
+ get_col_type,
+ get_foreign_keys_metadata,
+ get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+ @classmethod
+ def get_table_metadata(
+ cls,
+ database: Database,
+ table: Table,
+ partition: Optional[Partition] = None,
+ ) -> TableMetadataResponse:
+ """
+ Returns basic table metadata
+ :param database: Database instance
+ :param table: A Table instance
+ :param partition: A Table partition info
+ :return: Basic table metadata
+ """
+ return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+ engine = "odps"
+ engine_name = "ODPS (MaxCompute)"
+ default_driver = "odps"
+
+ @classmethod
+ def get_table_metadata(
+ cls, database: Any, table: Table, partition: Optional[Partition] = None
+ ) -> TableMetadataResponse:
+ """
+ Get table metadata information, including type, pk, fks.
+ This function raises SQLAlchemyError when a schema is not found.
+
+ :param partition: The table's partition info
+ :param database: The database model
+ :param table: Table instance
+ :return: Dict table metadata ready for API response
+ """
+ keys = []
+ columns = database.get_columns(table)
+ primary_key = database.get_pk_constraint(table)
+ if primary_key and primary_key.get("constrained_columns"):
+ primary_key["column_names"] =
primary_key.pop("constrained_columns")
+ primary_key["type"] = "pk"
+ keys += [primary_key]
+ foreign_keys = get_foreign_keys_metadata(database, table)
+ indexes = get_indexes_metadata(database, table)
+ keys += foreign_keys + indexes
+ payload_columns: list[TableMetadataColumnsResponse] = []
+ table_comment = database.get_table_comment(table)
+ for col in columns:
+ dtype = get_col_type(col)
+ payload_columns.append(
+ {
+ "name": col["column_name"],
+ "type": dtype.split("(")[0] if "(" in dtype else dtype,
+ "longType": dtype,
+ "keys": [
+ k for k in keys if col["column_name"] in
k["column_names"]
+ ],
+ "comment": col.get("comment"),
+ }
+ )
+
+ with database.get_sqla_engine(
+ catalog=table.catalog, schema=table.schema
+ ) as engine:
+ return {
+ "name": table.table,
+ "columns": payload_columns,
+ "selectStar": cls.select_star(
+ database=database,
+ table=table,
+ engine=engine,
+ limit=100,
+ show_cols=False,
+ indent=True,
+ latest_partition=True,
+ cols=columns,
+ partition=partition,
+ ),
+ "primaryKey": primary_key,
+ "foreignKeys": foreign_keys,
+ "indexes": keys,
+ "comment": table_comment,
+ }
+
+ @classmethod
+ def select_star( # pylint: disable=too-many-arguments
+ cls,
+ database: Database,
+ table: Table,
+ engine: Engine,
+ limit: int = 100,
+ show_cols: bool = False,
+ indent: bool = True,
+ latest_partition: bool = True,
+ cols: list[ResultSetColumnType] | None = None,
+ partition: Optional[Partition] = None,
+ ) -> str:
+ """
+ Generate a "SELECT * from [schema.]table_name" query with appropriate
limit.
+
+ WARNING: expects only unquoted table and schema names.
+
+ :param partition: The table's partition info
+ :param database: Database instance
+ :param table: Table instance
+ :param engine: SqlAlchemy Engine instance
+ :param limit: limit to impose on query
+ :param show_cols: Show columns in query; otherwise use "*"
+ :param indent: Add indentation to query
+ :param latest_partition: Only query the latest partition
+ :param cols: Columns to include in query
+ :return: SQL query
+ """
+ # pylint: disable=redefined-outer-name
+ fields: str | list[Any] = "*"
+ cols = cols or []
+ if (show_cols or latest_partition) and not cols:
+ cols = database.get_columns(table)
+
+ if show_cols:
+ fields = cls._get_fields(cols)
+ full_table_name = cls.quote_table(table, engine.dialect)
+ qry = select(fields).select_from(text(full_table_name))
+ if database.backend == "odps":
+ if (
+ partition is not None
+ and partition.is_partitioned_table
+ and partition.partition_column is not None
+ and len(partition.partition_column) > 0
+ ):
+ partition_str = partition.partition_column[0]
+ partition_str_where = f"CAST({partition_str} AS STRING) LIKE
'%'"
+ qry = qry.where(text(partition_str_where))
Review Comment:
The partition WHERE clause `CAST({partition_str} AS STRING) LIKE '%'` is
always true (since the `LIKE '%'` pattern matches any string, including empty
string), so this condition never actually filters data. It appears to be added
just to satisfy ODPS's requirement that a partition filter be present, but ODPS
requires specifying a concrete partition value (e.g., `partition_col =
'some_value'`), not an always-true predicate. It's unclear whether ODPS will
accept this pattern—and if it does, it would scan all partitions rather than
the latest one, potentially resulting in very large and expensive queries that
could time out or return excessive data.
```suggestion
# Intentionally avoid adding a dummy always-true partition
predicate
# here. Real partition pruning is handled via
where_latest_partition
# when latest_partition is True.
pass
```
##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
-
- table = Table(parameters["name"], parameters["schema"],
parameters["catalog"])
+ table_name = str(parameters["name"])
+ table = Table(table_name, parameters["schema"], parameters["catalog"])
+ is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
+ database, table_name
+ )
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
Review Comment:
The call to `DatabaseDAO.is_odps_partitioned_table` at line 1084 occurs
before the authorization check `security_manager.raise_for_access` at line
1088. This means the ODPS API is contacted to check partition status even for
users who don't have access to the table. The security check should happen
before any backend calls. The `is_odps_partitioned_table` call should be moved
to after the security check at line 1091.
```suggestion
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
```
##########
superset/daos/database.py:
##########
@@ -239,6 +241,49 @@ def get_datasets(
.all()
)
+ @classmethod
+ def is_odps_partitioned_table(
+ cls, database: Database, table_name: str
+ ) -> tuple[bool, list[str]]:
+ """
+ This function is used to determine and retrieve
+ partition information of the ODPS table.
+ The return values are whether the partition
+ table is partitioned and the names of all partition fields.
+ """
+ if not database:
+ raise ValueError("Database not found")
+ if database.backend != "odps":
+ return False, []
+ try:
+ from odps import ODPS
+ except ImportError:
+ logger.warning("pyodps is not installed, cannot check ODPS
partition info")
+ return False, []
+ uri = database.sqlalchemy_uri
+ access_key = database.password
+ pattern = re.compile(
+
r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?"
+ r"endpoint=(?P<endpoint>[^&]+))"
+ )
+ if not uri or not isinstance(uri, str):
+ logger.warning(
+ "Invalid or missing sqlalchemy URI, please provide a correct
URI"
+ )
+ return False, []
+ if match := pattern.match(unquote(uri)):
+ access_id = match.group("username")
+ project = match.group("project")
+ endpoint = match.group("endpoint")
+ odps_client = ODPS(access_id, access_key, project,
endpoint=endpoint)
+ table = odps_client.get_table(table_name)
+ if table.exist_partition:
+ partition_spec = table.table_schema.partitions
+ partition_fields = [partition.name for partition in
partition_spec]
+ return True, partition_fields
+ return False, []
+ return False, []
Review Comment:
The `is_odps_partitioned_table` method parses `database.sqlalchemy_uri`
(which stores the masked URI with the password replaced by "XXXXXXXXXX") using
a regex that includes a `password` group. The masked password is silently
matched and ignored, while the real password is read from `database.password`.
This works, but it's fragile: if the ODPS URI format changes, or the password
mask changes, or the URI doesn't match the regex for any reason (e.g., if the
URI has additional query parameters), the function silently returns `(False,
[])` without warning that an ODPS table's partition status couldn't be
determined. Silent failure can mislead users into thinking a partitioned table
is not partitioned, hiding potential errors. At minimum, a warning should be
logged when the URI pattern does not match on an ODPS database.
```suggestion
else:
logger.warning(
"ODPS sqlalchemy URI does not match expected pattern; "
"unable to determine partition info for table %r",
table_name,
)
return False, []
```
##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
-
- table = Table(parameters["name"], parameters["schema"],
parameters["catalog"])
+ table_name = str(parameters["name"])
+ table = Table(table_name, parameters["schema"], parameters["catalog"])
+ is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
+ database, table_name
+ )
Review Comment:
The `is_odps_partitioned_table` method is called unconditionally for every
`table_metadata` request in `api.py`, regardless of the database backend.
Although the method returns early for non-ODPS backends (`database.backend !=
"odps"`), for ODPS databases this method creates an entirely new ODPS client
connection (separate from the SQLAlchemy connection pool) and makes a remote
API call on every table metadata request. This adds latency to every table
preview in SQLLab for ODPS databases, and bypasses Superset's normal connection
pooling and management. Consider caching the partition info or using the
existing SQLAlchemy inspector to retrieve this information instead.
##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
-
- table = Table(parameters["name"], parameters["schema"],
parameters["catalog"])
+ table_name = str(parameters["name"])
+ table = Table(table_name, parameters["schema"], parameters["catalog"])
+ is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
+ database, table_name
+ )
try:
Review Comment:
The call to `DatabaseDAO.is_odps_partitioned_table` at line 1084 is not
wrapped in a try-except block. Inside this method,
`odps_client.get_table(table_name)` makes a remote network call that can raise
exceptions (e.g., network errors, authentication failures, or ODPS-specific
errors). Any uncaught exception here will propagate as an unhandled 500 error.
This call should be wrapped in error handling so that failures (e.g., unable to
connect to ODPS) degrade gracefully—for example, by logging a warning and
defaulting to the non-partitioned path.
```suggestion
try:
is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
except Exception as ex: # pylint: disable=broad-except
logger.warning(
"Error determining ODPS partition information for table %s:
%s; "
"falling back to non-partitioned metadata path",
table_name,
error_msg_from_exception(ex),
)
is_partitioned_table, partition_fields = False, []
try:
```
##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+ TableMetadataColumnsResponse,
+ TableMetadataResponse,
+)
+from superset.databases.utils import (
+ get_col_type,
+ get_foreign_keys_metadata,
+ get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+ @classmethod
+ def get_table_metadata(
+ cls,
+ database: Database,
+ table: Table,
+ partition: Optional[Partition] = None,
+ ) -> TableMetadataResponse:
+ """
+ Returns basic table metadata
+ :param database: Database instance
+ :param table: A Table instance
+ :param partition: A Table partition info
+ :return: Basic table metadata
+ """
+ return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+ engine = "odps"
+ engine_name = "ODPS (MaxCompute)"
+ default_driver = "odps"
+
+ @classmethod
+ def get_table_metadata(
+ cls, database: Any, table: Table, partition: Optional[Partition] = None
+ ) -> TableMetadataResponse:
+ """
+ Get table metadata information, including type, pk, fks.
+ This function raises SQLAlchemyError when a schema is not found.
+
+ :param partition: The table's partition info
+ :param database: The database model
+ :param table: Table instance
+ :return: Dict table metadata ready for API response
+ """
+ keys = []
+ columns = database.get_columns(table)
+ primary_key = database.get_pk_constraint(table)
+ if primary_key and primary_key.get("constrained_columns"):
+ primary_key["column_names"] =
primary_key.pop("constrained_columns")
+ primary_key["type"] = "pk"
+ keys += [primary_key]
+ foreign_keys = get_foreign_keys_metadata(database, table)
+ indexes = get_indexes_metadata(database, table)
+ keys += foreign_keys + indexes
+ payload_columns: list[TableMetadataColumnsResponse] = []
+ table_comment = database.get_table_comment(table)
+ for col in columns:
+ dtype = get_col_type(col)
+ payload_columns.append(
+ {
+ "name": col["column_name"],
+ "type": dtype.split("(")[0] if "(" in dtype else dtype,
+ "longType": dtype,
+ "keys": [
+ k for k in keys if col["column_name"] in
k["column_names"]
+ ],
+ "comment": col.get("comment"),
+ }
+ )
+
+ with database.get_sqla_engine(
+ catalog=table.catalog, schema=table.schema
+ ) as engine:
+ return {
+ "name": table.table,
+ "columns": payload_columns,
+ "selectStar": cls.select_star(
+ database=database,
+ table=table,
+ engine=engine,
+ limit=100,
+ show_cols=False,
+ indent=True,
+ latest_partition=True,
+ cols=columns,
+ partition=partition,
+ ),
+ "primaryKey": primary_key,
+ "foreignKeys": foreign_keys,
+ "indexes": keys,
+ "comment": table_comment,
+ }
+
+ @classmethod
+ def select_star( # pylint: disable=too-many-arguments
+ cls,
+ database: Database,
+ table: Table,
+ engine: Engine,
+ limit: int = 100,
+ show_cols: bool = False,
+ indent: bool = True,
+ latest_partition: bool = True,
+ cols: list[ResultSetColumnType] | None = None,
+ partition: Optional[Partition] = None,
+ ) -> str:
+ """
+ Generate a "SELECT * from [schema.]table_name" query with appropriate
limit.
+
+ WARNING: expects only unquoted table and schema names.
+
+ :param partition: The table's partition info
+ :param database: Database instance
+ :param table: Table instance
+ :param engine: SqlAlchemy Engine instance
+ :param limit: limit to impose on query
+ :param show_cols: Show columns in query; otherwise use "*"
+ :param indent: Add indentation to query
+ :param latest_partition: Only query the latest partition
+ :param cols: Columns to include in query
+ :return: SQL query
+ """
+ # pylint: disable=redefined-outer-name
+ fields: str | list[Any] = "*"
+ cols = cols or []
+ if (show_cols or latest_partition) and not cols:
+ cols = database.get_columns(table)
+
+ if show_cols:
+ fields = cls._get_fields(cols)
+ full_table_name = cls.quote_table(table, engine.dialect)
+ qry = select(fields).select_from(text(full_table_name))
+ if database.backend == "odps":
+ if (
+ partition is not None
+ and partition.is_partitioned_table
+ and partition.partition_column is not None
+ and len(partition.partition_column) > 0
+ ):
+ partition_str = partition.partition_column[0]
+ partition_str_where = f"CAST({partition_str} AS STRING) LIKE
'%'"
+ qry = qry.where(text(partition_str_where))
+ if limit:
+ qry = qry.limit(limit)
+ if latest_partition:
+ partition_query = cls.where_latest_partition(
+ database,
+ table,
+ qry,
+ columns=cols,
+ )
+ if partition_query is not None:
+ qry = partition_query
+ sql = database.compile_sqla_query(qry, table.catalog, table.schema)
+ if indent:
+ sql = SQLScript(sql, engine=cls.engine).format()
+ return sql
Review Comment:
There is no unit test file for `OdpsEngineSpec` or `OdpsBaseEngineSpec`.
Every other engine spec in the codebase (e.g., `test_hive.py`,
`test_presto.py`, `test_bigquery.py`, etc.) has a dedicated test file under
`tests/unit_tests/db_engine_specs/`. A new
`tests/unit_tests/db_engine_specs/test_odps.py` file should be added to cover
`OdpsEngineSpec.select_star`, `OdpsEngineSpec.get_table_metadata`, and
`DatabaseDAO.is_odps_partitioned_table`.
##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+ TableMetadataColumnsResponse,
+ TableMetadataResponse,
+)
+from superset.databases.utils import (
+ get_col_type,
+ get_foreign_keys_metadata,
+ get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+ @classmethod
+ def get_table_metadata(
+ cls,
+ database: Database,
+ table: Table,
+ partition: Optional[Partition] = None,
+ ) -> TableMetadataResponse:
+ """
+ Returns basic table metadata
+ :param database: Database instance
+ :param table: A Table instance
+ :param partition: A Table partition info
+ :return: Basic table metadata
+ """
+ return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+ engine = "odps"
+ engine_name = "ODPS (MaxCompute)"
+ default_driver = "odps"
+
+ @classmethod
+ def get_table_metadata(
+ cls, database: Any, table: Table, partition: Optional[Partition] = None
+ ) -> TableMetadataResponse:
+ """
+ Get table metadata information, including type, pk, fks.
+ This function raises SQLAlchemyError when a schema is not found.
+
+ :param partition: The table's partition info
+ :param database: The database model
+ :param table: Table instance
+ :return: Dict table metadata ready for API response
+ """
+ keys = []
+ columns = database.get_columns(table)
+ primary_key = database.get_pk_constraint(table)
+ if primary_key and primary_key.get("constrained_columns"):
+ primary_key["column_names"] =
primary_key.pop("constrained_columns")
+ primary_key["type"] = "pk"
+ keys += [primary_key]
+ foreign_keys = get_foreign_keys_metadata(database, table)
+ indexes = get_indexes_metadata(database, table)
+ keys += foreign_keys + indexes
+ payload_columns: list[TableMetadataColumnsResponse] = []
+ table_comment = database.get_table_comment(table)
+ for col in columns:
+ dtype = get_col_type(col)
+ payload_columns.append(
+ {
+ "name": col["column_name"],
+ "type": dtype.split("(")[0] if "(" in dtype else dtype,
+ "longType": dtype,
+ "keys": [
+ k for k in keys if col["column_name"] in
k["column_names"]
+ ],
+ "comment": col.get("comment"),
+ }
+ )
+
+ with database.get_sqla_engine(
+ catalog=table.catalog, schema=table.schema
+ ) as engine:
+ return {
+ "name": table.table,
+ "columns": payload_columns,
+ "selectStar": cls.select_star(
+ database=database,
+ table=table,
+ engine=engine,
+ limit=100,
+ show_cols=False,
+ indent=True,
+ latest_partition=True,
+ cols=columns,
+ partition=partition,
+ ),
+ "primaryKey": primary_key,
+ "foreignKeys": foreign_keys,
+ "indexes": keys,
+ "comment": table_comment,
+ }
+
+ @classmethod
+ def select_star( # pylint: disable=too-many-arguments
+ cls,
+ database: Database,
+ table: Table,
+ engine: Engine,
+ limit: int = 100,
+ show_cols: bool = False,
+ indent: bool = True,
+ latest_partition: bool = True,
+ cols: list[ResultSetColumnType] | None = None,
+ partition: Optional[Partition] = None,
Review Comment:
The PR description states that one of its improvements is "Updated typing to
modern Python style (`list[str] | None` instead of `Optional[List[str]]`)", but
`odps.py` still uses `Optional[Partition]` throughout (lines 50, 69, 140). For
consistency with the stated goal and the modern style used in
`Partition.partition_column: list[str] | None`, these should be changed to
`Partition | None`.
```suggestion
partition: Partition | None = None,
```
##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
-
- table = Table(parameters["name"], parameters["schema"],
parameters["catalog"])
+ table_name = str(parameters["name"])
+ table = Table(table_name, parameters["schema"], parameters["catalog"])
+ is_partitioned_table, partition_fields =
DatabaseDAO.is_odps_partitioned_table(
+ database, table_name
+ )
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
+ partition = Partition(is_partitioned_table, partition_fields)
+ if is_partitioned_table:
+ from superset.db_engine_specs.odps import OdpsEngineSpec
- payload = database.db_engine_spec.get_table_metadata(database, table)
+ payload = OdpsEngineSpec.get_table_metadata(database, table,
partition)
Review Comment:
The `table_metadata` endpoint now hardcodes
`OdpsEngineSpec.get_table_metadata` for any ODPS partitioned table, regardless
of the actual engine spec configured for the database. This means if a user
connects to ODPS through a custom subclass of `OdpsEngineSpec` (e.g., a
company-internal extension), the custom metadata logic will be bypassed, and
the generic `OdpsEngineSpec.get_table_metadata` will be used instead. The
`database.db_engine_spec.get_table_metadata` should be called consistently—the
partition-aware behavior should be implemented by the engine spec itself, not
by bypassing the spec in the API layer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]