Copilot commented on code in PR #38174:
URL: https://github.com/apache/superset/pull/38174#discussion_r2885439939


##########
superset/sql/parse.py:
##########
@@ -321,6 +321,37 @@ def qualify(
         )
 
 
+@dataclass(eq=True, frozen=True)
+class Partition:
+    """
+    Partition object, with two attribute keys:
+    ispartitioned_table and partition_comlumn,
+    used to provide partition information
+    Here is an example of an object:
+    {"ispartitioned_table":true,"partition_column":["month","day"]}
+    """
+
+    is_partitioned_table: bool
+    partition_column: list[str] | None = None
+
+    def __str__(self) -> str:
+        """
+        Return the partition columns of table name.

Review Comment:
   The `__str__` docstring says "Return the partition columns of table name" 
which is inaccurate — the method returns a string representation of the entire 
`Partition` object, not just the partition columns.
   ```suggestion
           Return a string representation of the Partition object, including
           the partition flag and partition columns.
   ```



##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)

Review Comment:
   The `OdpsBaseEngineSpec.get_table_metadata` method at line 59 calls itself 
recursively via `cls.get_table_metadata(database, table, partition)`. This 
causes infinite recursion and a `RecursionError` whenever this method is 
invoked directly on `OdpsBaseEngineSpec`. Since the method is meant to be 
overridden by subclasses (and `OdpsEngineSpec` does override it), the intent 
seems to be to call the parent's method, but the implementation instead calls 
itself. If the design goal is to delegate to the parent class implementation 
(i.e., `BaseEngineSpec.get_table_metadata`), it should call 
`super().get_table_metadata(database, table)`, though that method doesn't 
accept a `partition` argument. If `OdpsBaseEngineSpec` is purely abstract and 
`get_table_metadata` should never be called on it directly, the method body 
should be removed entirely or replaced with `raise NotImplementedError`.
   ```suggestion
           raise NotImplementedError(
               "OdpsBaseEngineSpec.get_table_metadata must be overridden"
           )
   ```



##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+    engine = "odps"
+    engine_name = "ODPS (MaxCompute)"
+    default_driver = "odps"
+
+    @classmethod
+    def get_table_metadata(
+        cls, database: Any, table: Table, partition: Optional[Partition] = None
+    ) -> TableMetadataResponse:
+        """
+        Get table metadata information, including type, pk, fks.
+        This function raises SQLAlchemyError when a schema is not found.
+
+        :param partition: The table's partition info
+        :param database: The database model
+        :param table: Table instance
+        :return: Dict table metadata ready for API response
+        """
+        keys = []
+        columns = database.get_columns(table)
+        primary_key = database.get_pk_constraint(table)
+        if primary_key and primary_key.get("constrained_columns"):
+            primary_key["column_names"] = 
primary_key.pop("constrained_columns")
+            primary_key["type"] = "pk"
+            keys += [primary_key]
+        foreign_keys = get_foreign_keys_metadata(database, table)
+        indexes = get_indexes_metadata(database, table)
+        keys += foreign_keys + indexes
+        payload_columns: list[TableMetadataColumnsResponse] = []
+        table_comment = database.get_table_comment(table)
+        for col in columns:
+            dtype = get_col_type(col)
+            payload_columns.append(
+                {
+                    "name": col["column_name"],
+                    "type": dtype.split("(")[0] if "(" in dtype else dtype,
+                    "longType": dtype,
+                    "keys": [
+                        k for k in keys if col["column_name"] in 
k["column_names"]
+                    ],
+                    "comment": col.get("comment"),
+                }
+            )
+
+        with database.get_sqla_engine(
+            catalog=table.catalog, schema=table.schema
+        ) as engine:
+            return {
+                "name": table.table,
+                "columns": payload_columns,
+                "selectStar": cls.select_star(
+                    database=database,
+                    table=table,
+                    engine=engine,
+                    limit=100,
+                    show_cols=False,
+                    indent=True,
+                    latest_partition=True,
+                    cols=columns,
+                    partition=partition,
+                ),
+                "primaryKey": primary_key,
+                "foreignKeys": foreign_keys,
+                "indexes": keys,
+                "comment": table_comment,
+            }
+
+    @classmethod
+    def select_star(  # pylint: disable=too-many-arguments
+        cls,
+        database: Database,
+        table: Table,
+        engine: Engine,
+        limit: int = 100,
+        show_cols: bool = False,
+        indent: bool = True,
+        latest_partition: bool = True,
+        cols: list[ResultSetColumnType] | None = None,
+        partition: Optional[Partition] = None,
+    ) -> str:

Review Comment:
   The `select_star` method in `OdpsEngineSpec` has a different signature than 
all other engine specs' `select_star` methods (including the base class). The 
base class, Presto, Hive, and BigQuery all use `dialect: Dialect` as the third 
parameter, but `OdpsEngineSpec.select_star` uses `engine: Engine`. This breaks 
the Liskov Substitution Principle and will cause issues if `select_star` is 
ever called polymorphically through the engine spec interface (e.g., 
`cls.select_star(database, table, dialect, ...)` from a caller using the 
standard interface). The method should use `dialect: Dialect` for consistency 
and call `cls.quote_table(table, dialect)` instead of `cls.quote_table(table, 
engine.dialect)`, or it should accept `**kwargs` to maintain compatibility.



##########
superset/sql/parse.py:
##########
@@ -321,6 +321,37 @@ def qualify(
         )
 
 
+@dataclass(eq=True, frozen=True)
+class Partition:
+    """
+    Partition object, with two attribute keys:
+    ispartitioned_table and partition_comlumn,
+    used to provide partition information
+    Here is an example of an object:
+    {"ispartitioned_table":true,"partition_column":["month","day"]}

Review Comment:
   The docstring for the `Partition` class contains two spelling errors: 
"ispartitioned_table" should be "is_partitioned_table", and "partition_comlumn" 
should be "partition_column".
   ```suggestion
       is_partitioned_table and partition_column,
       used to provide partition information
       Here is an example of an object:
       {"is_partitioned_table": true, "partition_column": ["month", "day"]}
   ```



##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+    engine = "odps"
+    engine_name = "ODPS (MaxCompute)"
+    default_driver = "odps"
+
+    @classmethod
+    def get_table_metadata(
+        cls, database: Any, table: Table, partition: Optional[Partition] = None
+    ) -> TableMetadataResponse:
+        """
+        Get table metadata information, including type, pk, fks.
+        This function raises SQLAlchemyError when a schema is not found.
+
+        :param partition: The table's partition info
+        :param database: The database model
+        :param table: Table instance
+        :return: Dict table metadata ready for API response
+        """
+        keys = []
+        columns = database.get_columns(table)
+        primary_key = database.get_pk_constraint(table)
+        if primary_key and primary_key.get("constrained_columns"):
+            primary_key["column_names"] = 
primary_key.pop("constrained_columns")
+            primary_key["type"] = "pk"
+            keys += [primary_key]
+        foreign_keys = get_foreign_keys_metadata(database, table)
+        indexes = get_indexes_metadata(database, table)
+        keys += foreign_keys + indexes
+        payload_columns: list[TableMetadataColumnsResponse] = []
+        table_comment = database.get_table_comment(table)
+        for col in columns:
+            dtype = get_col_type(col)
+            payload_columns.append(
+                {
+                    "name": col["column_name"],
+                    "type": dtype.split("(")[0] if "(" in dtype else dtype,
+                    "longType": dtype,
+                    "keys": [
+                        k for k in keys if col["column_name"] in 
k["column_names"]
+                    ],
+                    "comment": col.get("comment"),
+                }
+            )
+
+        with database.get_sqla_engine(
+            catalog=table.catalog, schema=table.schema
+        ) as engine:
+            return {
+                "name": table.table,
+                "columns": payload_columns,
+                "selectStar": cls.select_star(
+                    database=database,
+                    table=table,
+                    engine=engine,
+                    limit=100,
+                    show_cols=False,
+                    indent=True,
+                    latest_partition=True,
+                    cols=columns,
+                    partition=partition,
+                ),
+                "primaryKey": primary_key,
+                "foreignKeys": foreign_keys,
+                "indexes": keys,
+                "comment": table_comment,
+            }
+
+    @classmethod
+    def select_star(  # pylint: disable=too-many-arguments
+        cls,
+        database: Database,
+        table: Table,
+        engine: Engine,
+        limit: int = 100,
+        show_cols: bool = False,
+        indent: bool = True,
+        latest_partition: bool = True,
+        cols: list[ResultSetColumnType] | None = None,
+        partition: Optional[Partition] = None,
+    ) -> str:
+        """
+        Generate a "SELECT * from [schema.]table_name" query with appropriate 
limit.
+
+        WARNING: expects only unquoted table and schema names.
+
+        :param partition: The table's partition info
+        :param database: Database instance
+        :param table: Table instance
+        :param engine: SqlAlchemy Engine instance
+        :param limit: limit to impose on query
+        :param show_cols: Show columns in query; otherwise use "*"
+        :param indent: Add indentation to query
+        :param latest_partition: Only query the latest partition
+        :param cols: Columns to include in query
+        :return: SQL query
+        """
+        # pylint: disable=redefined-outer-name
+        fields: str | list[Any] = "*"
+        cols = cols or []
+        if (show_cols or latest_partition) and not cols:
+            cols = database.get_columns(table)
+
+        if show_cols:
+            fields = cls._get_fields(cols)
+        full_table_name = cls.quote_table(table, engine.dialect)
+        qry = select(fields).select_from(text(full_table_name))
+        if database.backend == "odps":
+            if (
+                partition is not None
+                and partition.is_partitioned_table
+                and partition.partition_column is not None
+                and len(partition.partition_column) > 0
+            ):
+                partition_str = partition.partition_column[0]
+                partition_str_where = f"CAST({partition_str} AS STRING) LIKE 
'%'"
+                qry = qry.where(text(partition_str_where))

Review Comment:
   The partition WHERE clause `CAST({partition_str} AS STRING) LIKE '%'` is 
always true (since the `LIKE '%'` pattern matches any string, including empty 
string), so this condition never actually filters data. It appears to be added 
just to satisfy ODPS's requirement that a partition filter be present, but ODPS 
requires specifying a concrete partition value (e.g., `partition_col = 
'some_value'`), not an always-true predicate. It's unclear whether ODPS will 
accept this pattern—and if it does, it would scan all partitions rather than 
the latest one, potentially resulting in very large and expensive queries that 
could time out or return excessive data.
   ```suggestion
                   # Intentionally avoid adding a dummy always-true partition 
predicate
                   # here. Real partition pruning is handled via 
where_latest_partition
                   # when latest_partition is True.
                   pass
   ```



##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
             parameters = QualifiedTableSchema().load(request.args)
         except ValidationError as ex:
             raise InvalidPayloadSchemaError(ex) from ex
-
-        table = Table(parameters["name"], parameters["schema"], 
parameters["catalog"])
+        table_name = str(parameters["name"])
+        table = Table(table_name, parameters["schema"], parameters["catalog"])
+        is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
+            database, table_name
+        )
         try:
             security_manager.raise_for_access(database=database, table=table)
         except SupersetSecurityException as ex:
             # instead of raising 403, raise 404 to hide table existence
             raise TableNotFoundException("No such table") from ex

Review Comment:
   The call to `DatabaseDAO.is_odps_partitioned_table` at line 1084 occurs 
before the authorization check `security_manager.raise_for_access` at line 
1088. This means the ODPS API is contacted to check partition status even for 
users who don't have access to the table. The security check should happen 
before any backend calls. The `is_odps_partitioned_table` call should be moved 
to after the security check at line 1091.
   ```suggestion
           try:
               security_manager.raise_for_access(database=database, table=table)
           except SupersetSecurityException as ex:
               # instead of raising 403, raise 404 to hide table existence
               raise TableNotFoundException("No such table") from ex
           is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
               database, table_name
           )
   ```



##########
superset/daos/database.py:
##########
@@ -239,6 +241,49 @@ def get_datasets(
             .all()
         )
 
+    @classmethod
+    def is_odps_partitioned_table(
+        cls, database: Database, table_name: str
+    ) -> tuple[bool, list[str]]:
+        """
+        This function is used to determine and retrieve
+        partition information of the ODPS table.
+        The return values are whether the partition
+        table is partitioned and the names of all partition fields.
+        """
+        if not database:
+            raise ValueError("Database not found")
+        if database.backend != "odps":
+            return False, []
+        try:
+            from odps import ODPS
+        except ImportError:
+            logger.warning("pyodps is not installed, cannot check ODPS 
partition info")
+            return False, []
+        uri = database.sqlalchemy_uri
+        access_key = database.password
+        pattern = re.compile(
+            
r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?"
+            r"endpoint=(?P<endpoint>[^&]+))"
+        )
+        if not uri or not isinstance(uri, str):
+            logger.warning(
+                "Invalid or missing sqlalchemy URI, please provide a correct 
URI"
+            )
+            return False, []
+        if match := pattern.match(unquote(uri)):
+            access_id = match.group("username")
+            project = match.group("project")
+            endpoint = match.group("endpoint")
+            odps_client = ODPS(access_id, access_key, project, 
endpoint=endpoint)
+            table = odps_client.get_table(table_name)
+            if table.exist_partition:
+                partition_spec = table.table_schema.partitions
+                partition_fields = [partition.name for partition in 
partition_spec]
+                return True, partition_fields
+            return False, []
+        return False, []

Review Comment:
   The `is_odps_partitioned_table` method parses `database.sqlalchemy_uri` 
(which stores the masked URI with the password replaced by "XXXXXXXXXX") using 
a regex that includes a `password` group. The masked password is silently 
matched and ignored, while the real password is read from `database.password`. 
This works, but it's fragile: if the ODPS URI format changes, or the password 
mask changes, or the URI doesn't match the regex for any reason (e.g., if the 
URI has additional query parameters), the function silently returns `(False, 
[])` without warning that an ODPS table's partition status couldn't be 
determined. Silent failure can mislead users into thinking a partitioned table 
is not partitioned, hiding potential errors. At minimum, a warning should be 
logged when the URI pattern does not match on an ODPS database.
   ```suggestion
           else:
               logger.warning(
                   "ODPS sqlalchemy URI does not match expected pattern; "
                   "unable to determine partition info for table %r",
                   table_name,
               )
               return False, []
   ```



##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
             parameters = QualifiedTableSchema().load(request.args)
         except ValidationError as ex:
             raise InvalidPayloadSchemaError(ex) from ex
-
-        table = Table(parameters["name"], parameters["schema"], 
parameters["catalog"])
+        table_name = str(parameters["name"])
+        table = Table(table_name, parameters["schema"], parameters["catalog"])
+        is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
+            database, table_name
+        )

Review Comment:
   The `is_odps_partitioned_table` method is called unconditionally for every 
`table_metadata` request in `api.py`, regardless of the database backend. 
Although the method returns early for non-ODPS backends (`database.backend != 
"odps"`), for ODPS databases this method creates an entirely new ODPS client 
connection (separate from the SQLAlchemy connection pool) and makes a remote 
API call on every table metadata request. This adds latency to every table 
preview in SQLLab for ODPS databases, and bypasses Superset's normal connection 
pooling and management. Consider caching the partition info or using the 
existing SQLAlchemy inspector to retrieve this information instead.



##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
             parameters = QualifiedTableSchema().load(request.args)
         except ValidationError as ex:
             raise InvalidPayloadSchemaError(ex) from ex
-
-        table = Table(parameters["name"], parameters["schema"], 
parameters["catalog"])
+        table_name = str(parameters["name"])
+        table = Table(table_name, parameters["schema"], parameters["catalog"])
+        is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
+            database, table_name
+        )
         try:

Review Comment:
   The call to `DatabaseDAO.is_odps_partitioned_table` at line 1084 is not 
wrapped in a try-except block. Inside this method, 
`odps_client.get_table(table_name)` makes a remote network call that can raise 
exceptions (e.g., network errors, authentication failures, or ODPS-specific 
errors). Any uncaught exception here will propagate as an unhandled 500 error. 
This call should be wrapped in error handling so that failures (e.g., unable to 
connect to ODPS) degrade gracefully—for example, by logging a warning and 
defaulting to the non-partitioned path.
   ```suggestion
           try:
               is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
                   database, table_name
               )
           except Exception as ex:  # pylint: disable=broad-except
               logger.warning(
                   "Error determining ODPS partition information for table %s: 
%s; "
                   "falling back to non-partitioned metadata path",
                   table_name,
                   error_msg_from_exception(ex),
               )
               is_partitioned_table, partition_fields = False, []
           try:
   ```



##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+    engine = "odps"
+    engine_name = "ODPS (MaxCompute)"
+    default_driver = "odps"
+
+    @classmethod
+    def get_table_metadata(
+        cls, database: Any, table: Table, partition: Optional[Partition] = None
+    ) -> TableMetadataResponse:
+        """
+        Get table metadata information, including type, pk, fks.
+        This function raises SQLAlchemyError when a schema is not found.
+
+        :param partition: The table's partition info
+        :param database: The database model
+        :param table: Table instance
+        :return: Dict table metadata ready for API response
+        """
+        keys = []
+        columns = database.get_columns(table)
+        primary_key = database.get_pk_constraint(table)
+        if primary_key and primary_key.get("constrained_columns"):
+            primary_key["column_names"] = 
primary_key.pop("constrained_columns")
+            primary_key["type"] = "pk"
+            keys += [primary_key]
+        foreign_keys = get_foreign_keys_metadata(database, table)
+        indexes = get_indexes_metadata(database, table)
+        keys += foreign_keys + indexes
+        payload_columns: list[TableMetadataColumnsResponse] = []
+        table_comment = database.get_table_comment(table)
+        for col in columns:
+            dtype = get_col_type(col)
+            payload_columns.append(
+                {
+                    "name": col["column_name"],
+                    "type": dtype.split("(")[0] if "(" in dtype else dtype,
+                    "longType": dtype,
+                    "keys": [
+                        k for k in keys if col["column_name"] in 
k["column_names"]
+                    ],
+                    "comment": col.get("comment"),
+                }
+            )
+
+        with database.get_sqla_engine(
+            catalog=table.catalog, schema=table.schema
+        ) as engine:
+            return {
+                "name": table.table,
+                "columns": payload_columns,
+                "selectStar": cls.select_star(
+                    database=database,
+                    table=table,
+                    engine=engine,
+                    limit=100,
+                    show_cols=False,
+                    indent=True,
+                    latest_partition=True,
+                    cols=columns,
+                    partition=partition,
+                ),
+                "primaryKey": primary_key,
+                "foreignKeys": foreign_keys,
+                "indexes": keys,
+                "comment": table_comment,
+            }
+
+    @classmethod
+    def select_star(  # pylint: disable=too-many-arguments
+        cls,
+        database: Database,
+        table: Table,
+        engine: Engine,
+        limit: int = 100,
+        show_cols: bool = False,
+        indent: bool = True,
+        latest_partition: bool = True,
+        cols: list[ResultSetColumnType] | None = None,
+        partition: Optional[Partition] = None,
+    ) -> str:
+        """
+        Generate a "SELECT * from [schema.]table_name" query with appropriate 
limit.
+
+        WARNING: expects only unquoted table and schema names.
+
+        :param partition: The table's partition info
+        :param database: Database instance
+        :param table: Table instance
+        :param engine: SqlAlchemy Engine instance
+        :param limit: limit to impose on query
+        :param show_cols: Show columns in query; otherwise use "*"
+        :param indent: Add indentation to query
+        :param latest_partition: Only query the latest partition
+        :param cols: Columns to include in query
+        :return: SQL query
+        """
+        # pylint: disable=redefined-outer-name
+        fields: str | list[Any] = "*"
+        cols = cols or []
+        if (show_cols or latest_partition) and not cols:
+            cols = database.get_columns(table)
+
+        if show_cols:
+            fields = cls._get_fields(cols)
+        full_table_name = cls.quote_table(table, engine.dialect)
+        qry = select(fields).select_from(text(full_table_name))
+        if database.backend == "odps":
+            if (
+                partition is not None
+                and partition.is_partitioned_table
+                and partition.partition_column is not None
+                and len(partition.partition_column) > 0
+            ):
+                partition_str = partition.partition_column[0]
+                partition_str_where = f"CAST({partition_str} AS STRING) LIKE 
'%'"
+                qry = qry.where(text(partition_str_where))
+        if limit:
+            qry = qry.limit(limit)
+        if latest_partition:
+            partition_query = cls.where_latest_partition(
+                database,
+                table,
+                qry,
+                columns=cols,
+            )
+            if partition_query is not None:
+                qry = partition_query
+        sql = database.compile_sqla_query(qry, table.catalog, table.schema)
+        if indent:
+            sql = SQLScript(sql, engine=cls.engine).format()
+        return sql

Review Comment:
   There is no unit test file for `OdpsEngineSpec` or `OdpsBaseEngineSpec`. 
Every other engine spec in the codebase (e.g., `test_hive.py`, 
`test_presto.py`, `test_bigquery.py`, etc.) has a dedicated test file under 
`tests/unit_tests/db_engine_specs/`. A new 
`tests/unit_tests/db_engine_specs/test_odps.py` file should be added to cover 
`OdpsEngineSpec.select_star`, `OdpsEngineSpec.get_table_metadata`, and 
`DatabaseDAO.is_odps_partitioned_table`.



##########
superset/db_engine_specs/odps.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript, Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+    engine = "odps"
+    engine_name = "ODPS (MaxCompute)"
+    default_driver = "odps"
+
+    @classmethod
+    def get_table_metadata(
+        cls, database: Any, table: Table, partition: Optional[Partition] = None
+    ) -> TableMetadataResponse:
+        """
+        Get table metadata information, including type, pk, fks.
+        This function raises SQLAlchemyError when a schema is not found.
+
+        :param partition: The table's partition info
+        :param database: The database model
+        :param table: Table instance
+        :return: Dict table metadata ready for API response
+        """
+        keys = []
+        columns = database.get_columns(table)
+        primary_key = database.get_pk_constraint(table)
+        if primary_key and primary_key.get("constrained_columns"):
+            primary_key["column_names"] = 
primary_key.pop("constrained_columns")
+            primary_key["type"] = "pk"
+            keys += [primary_key]
+        foreign_keys = get_foreign_keys_metadata(database, table)
+        indexes = get_indexes_metadata(database, table)
+        keys += foreign_keys + indexes
+        payload_columns: list[TableMetadataColumnsResponse] = []
+        table_comment = database.get_table_comment(table)
+        for col in columns:
+            dtype = get_col_type(col)
+            payload_columns.append(
+                {
+                    "name": col["column_name"],
+                    "type": dtype.split("(")[0] if "(" in dtype else dtype,
+                    "longType": dtype,
+                    "keys": [
+                        k for k in keys if col["column_name"] in 
k["column_names"]
+                    ],
+                    "comment": col.get("comment"),
+                }
+            )
+
+        with database.get_sqla_engine(
+            catalog=table.catalog, schema=table.schema
+        ) as engine:
+            return {
+                "name": table.table,
+                "columns": payload_columns,
+                "selectStar": cls.select_star(
+                    database=database,
+                    table=table,
+                    engine=engine,
+                    limit=100,
+                    show_cols=False,
+                    indent=True,
+                    latest_partition=True,
+                    cols=columns,
+                    partition=partition,
+                ),
+                "primaryKey": primary_key,
+                "foreignKeys": foreign_keys,
+                "indexes": keys,
+                "comment": table_comment,
+            }
+
+    @classmethod
+    def select_star(  # pylint: disable=too-many-arguments
+        cls,
+        database: Database,
+        table: Table,
+        engine: Engine,
+        limit: int = 100,
+        show_cols: bool = False,
+        indent: bool = True,
+        latest_partition: bool = True,
+        cols: list[ResultSetColumnType] | None = None,
+        partition: Optional[Partition] = None,

Review Comment:
   The PR description states that one of its improvements is "Updated typing to 
modern Python style (`list[str] | None` instead of `Optional[List[str]]`)", but 
`odps.py` still uses `Optional[Partition]` throughout (lines 50, 69, 140). For 
consistency with the stated goal and the modern style used in 
`Partition.partition_column: list[str] | None`, these should be changed to 
`Partition | None`.
   ```suggestion
           partition: Partition | None = None,
   ```



##########
superset/databases/api.py:
##########
@@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
             parameters = QualifiedTableSchema().load(request.args)
         except ValidationError as ex:
             raise InvalidPayloadSchemaError(ex) from ex
-
-        table = Table(parameters["name"], parameters["schema"], 
parameters["catalog"])
+        table_name = str(parameters["name"])
+        table = Table(table_name, parameters["schema"], parameters["catalog"])
+        is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
+            database, table_name
+        )
         try:
             security_manager.raise_for_access(database=database, table=table)
         except SupersetSecurityException as ex:
             # instead of raising 403, raise 404 to hide table existence
             raise TableNotFoundException("No such table") from ex
+        partition = Partition(is_partitioned_table, partition_fields)
+        if is_partitioned_table:
+            from superset.db_engine_specs.odps import OdpsEngineSpec
 
-        payload = database.db_engine_spec.get_table_metadata(database, table)
+            payload = OdpsEngineSpec.get_table_metadata(database, table, 
partition)

Review Comment:
   The `table_metadata` endpoint now hardcodes 
`OdpsEngineSpec.get_table_metadata` for any ODPS partitioned table, regardless 
of the actual engine spec configured for the database. This means if a user 
connects to ODPS through a custom subclass of `OdpsEngineSpec` (e.g., a 
company-internal extension), the custom metadata logic will be bypassed, and 
the generic `OdpsEngineSpec.get_table_metadata` will be used instead. The 
`database.db_engine_spec.get_table_metadata` should be called consistently—the 
partition-aware behavior should be implemented by the engine spec itself, not 
by bypassing the spec in the API layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to