cloud-fan commented on code in PR #55327:
URL: https://github.com/apache/spark/pull/55327#discussion_r3245959707


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -291,7 +320,104 @@ object ClusterBySpec {
    * Converts the clustering column property to a ClusterBySpec.
    */
   def fromProperty(columns: String): ClusterBySpec = {
-    
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+    
ClusterBySpec.fromColumnEntries(mapper.readValue[Seq[Seq[String]]](columns))
+  }
+
+  /**
+   * Constructs a [[ClusterBySpec]] from the stored column entries (each a 
Seq[String]).
+   * An entry is either a multi-part column name or a single-element Seq 
containing an
+   * expression string like "variant_get(col,'$.foo','STRING')".
+   */
+  def fromColumnEntries(entries: Seq[Seq[String]]): ClusterBySpec = {
+    val parsedCols: Seq[(NamedReference, Option[Transform])] = entries.map {
+      case names if names.length == 1 =>
+        // Could be an expression of form "funcName(col, arg1, arg2, ...)"
+        try {
+          CatalystSqlParser.parseExpression(names.head) match {
+            case u: UnresolvedFunction =>
+              val transform: Transform = new ClusteringColumnTransform(
+                QuotingUtils.quoted(u.nameParts.toArray),
+                u.children.map {
+                  case a: UnresolvedAttribute =>
+                    FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+                  case l: Literal => LiteralValue(l.value, l.dataType)
+                  case other => throw new IllegalStateException(
+                    s"Unexpected argument type in CLUSTER BY expression: 
${other.getClass}")
+                }.toArray)

Review Comment:
   Two SQL-string round-trips into the v2 API on these lines (and the parallel 
block at :379-387):
   
   1. `name = QuotingUtils.quoted(u.nameParts.toArray)` puts a backtick-quoted 
SQL identifier string into `Transform.name()` for multi-part function names. 
Every existing v2 transform's `name` is an unqualified plain string 
(`"bucket"`, `"years"`, `ApplyTransform.name = applyCtx.identifier.getText` at 
`AstBuilder.scala:5091`). Connectors comparing `transform.name()` against a 
plain function name will silently miss namespaced cases.
   2. `FieldReference(QuotingUtils.quoted(a.nameParts.toArray))` has structured 
`Seq[String]` parts in hand, stringifies them with backtick quoting, then 
`FieldReference(String)` re-parses via 
`CatalystSqlParser.parseMultipartIdentifier`. Should be 
`FieldReference(a.nameParts)` (the `Seq[String]` apply that exists already).
   
   See body summary, pillar 2.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -271,11 +273,38 @@ case class CatalogTablePartition(
  * A container for clustering information.
  *
  * @param columnNames the names of the columns used for clustering.
+ * @param clusteringColumnTransforms per-column transforms for 
expression-based clustering.
+ *                                   When non-empty, each element corresponds 
to a column in
+ *                                   columnNames: None means a plain column 
reference,
+ *                                   Some(transform) means an expression like 
UPPER(col).
+ *                                   An empty Seq means no transforms on any 
columns.
  */
-case class ClusterBySpec(columnNames: Seq[NamedReference]) {
+case class ClusterBySpec(
+    columnNames: Seq[NamedReference],
+    clusteringColumnTransforms: Seq[Option[Transform]] = Seq.empty) {
   override def toString: String = toJson
 
-  def toJson: String = 
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
+  def toJson: String = toColumnNames
+
+  def toColumnNames: String = {
+    val entries: Seq[Seq[String]] = if (clusteringColumnTransforms.isEmpty) {
+      columnNames.map(_.fieldNames().toSeq)
+    } else {
+      columnNames.zip(clusteringColumnTransforms).map {
+        case (colName, None) => colName.fieldNames().toSeq
+        case (colName, Some(transform)) =>
+          val args = transform.arguments().map {
+            case n: NamedReference => 
n.fieldNames().map(QuotingUtils.quoteIfNeeded).mkString(".")
+            case LiteralValue(value, dataType) =>
+              Literal(value, dataType).sql
+            case other => throw new IllegalStateException(
+              s"Unexpected argument type in CLUSTER BY expression: 
${other.getClass}")
+          }
+          
Seq(s"${QuotingUtils.quoteIfNeeded(transform.name())}(${args.mkString(",")})")
+      }
+    }
+    ClusterBySpec.mapper.writeValueAsString(entries)
+  }

Review Comment:
   This stringifies a v2 Transform into a SQL function-call expression and 
embeds it in the existing `Seq[Seq[String]]` JSON shape; `fromColumnEntries` at 
:331 then calls `CatalystSqlParser.parseExpression` on each single-element 
entry to recover the transform. Two concrete consequences:
   
   - A column whose name parses as a function call — e.g. backtick-quoted 
``CLUSTER BY (`lower(col)`)`` — is stored as `[["lower(col)"]]` 
(indistinguishable from a transform entry) and read back as a 
`ClusteringColumnTransform`. Silent regression on existing tables with such 
names.
   - Round-trip stability depends on `Literal(...).sql` ↔ the SQL parser. 
`TimestampType.sql` emits `TIMESTAMP 'literal'`, which the parser turns into a 
`Cast` of a string, not a `Literal` — the `ClusterBySpecSuite` timestamp tests 
pass only because they pin the session timezone.
   
   Please use a structured format (e.g. `{"col": [...], "transform": {"name": 
..., "args": [...]}}`) using the same Jackson mapper. See body summary, pillar 
3.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -291,7 +320,104 @@ object ClusterBySpec {
    * Converts the clustering column property to a ClusterBySpec.
    */
   def fromProperty(columns: String): ClusterBySpec = {
-    
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+    
ClusterBySpec.fromColumnEntries(mapper.readValue[Seq[Seq[String]]](columns))
+  }
+
+  /**
+   * Constructs a [[ClusterBySpec]] from the stored column entries (each a 
Seq[String]).
+   * An entry is either a multi-part column name or a single-element Seq 
containing an
+   * expression string like "variant_get(col,'$.foo','STRING')".
+   */
+  def fromColumnEntries(entries: Seq[Seq[String]]): ClusterBySpec = {
+    val parsedCols: Seq[(NamedReference, Option[Transform])] = entries.map {
+      case names if names.length == 1 =>
+        // Could be an expression of form "funcName(col, arg1, arg2, ...)"
+        try {
+          CatalystSqlParser.parseExpression(names.head) match {
+            case u: UnresolvedFunction =>
+              val transform: Transform = new ClusteringColumnTransform(
+                QuotingUtils.quoted(u.nameParts.toArray),
+                u.children.map {
+                  case a: UnresolvedAttribute =>
+                    FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+                  case l: Literal => LiteralValue(l.value, l.dataType)
+                  case other => throw new IllegalStateException(
+                    s"Unexpected argument type in CLUSTER BY expression: 
${other.getClass}")
+                }.toArray)
+              val colRef = transform.arguments().collectFirst {
+                case f: FieldReference => f
+              }.getOrElse(throw new IllegalStateException(
+                "CLUSTER BY expression must contain exactly one column 
reference"))
+              (colRef, Some(transform))
+            case _ => (FieldReference(names), None)
+          }
+        } catch {
+          // Sometimes, we can get a parse exception if the column name 
contains invalid
+          // characters by itself. Quote the column name and see if parsing it 
as a multipart
+          // identifier works, and if so, use that as a direct FieldReference 
to a column.
+          case _: ParseException =>
+            val identifier = CatalystSqlParser.parseMultipartIdentifier(
+              QuotingUtils.quoteIfNeeded(names.head))
+            
(FieldReference(identifier.map(_.stripPrefix("`").stripSuffix("`"))), None)
+        }
+      case names => (FieldReference(names), None)
+    }
+    val (colNames, transforms) = parsedCols.unzip
+    val transformsSeq = if (transforms.forall(_.isEmpty)) Seq.empty else 
transforms
+    ClusterBySpec(colNames, transformsSeq)
+  }
+
+  def fromExpressions(
+      parsedCols: Seq[Either[Expression, Seq[String]]]): ClusterBySpec = {
+    val (clusteringColumnNames, clusteringColumnExpressions) = parsedCols.map {
+      case Left(e) =>
+        e match {
+          // A bare column reference parsed as an expression - treat as plain 
column.
+          case a: UnresolvedAttribute =>
+            (FieldReference(a.nameParts), None)
+          case u: UnresolvedFunction =>
+            val transform = new ClusteringColumnTransform(
+              QuotingUtils.quoted(u.nameParts.toArray),
+              u.children.map {
+                case a: UnresolvedAttribute =>
+                  FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+                case l: Literal => LiteralValue(l.value, l.dataType)
+                case _ => throw new IllegalStateException(
+                  "Unsupported expression argument in CLUSTER BY transform")
+              }.toArray)
+            val transformName = QuotingUtils.quoted(u.nameParts.toArray)
+            val refs = transform.arguments().collect {
+              case f: FieldReference => f
+            }
+            if (refs.isEmpty) {
+              throw new AnalysisException(
+                errorClass = 
"CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",
+                messageParameters = Map("expressionType" -> transformName))
+            }
+            if (refs.length != 1) {
+              throw new AnalysisException(
+                errorClass = 
"CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",
+                messageParameters = Map("expressionType" -> transformName))
+            }
+            if (!transform.arguments().head.isInstanceOf[FieldReference]) {

Review Comment:
   Why must the column reference be at `arguments().head`? `PARTITIONED BY 
(bucket(4, c1, c2))` already allows columns at any position; the equivalent 
restriction here is undocumented and the error message at :403-406 doesn't 
mention it. If the constraint is intentional, please document it and surface it 
in the error; if not, drop this check.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala:
##########
@@ -157,12 +157,30 @@ private[sql] object BucketTransform {
   }
 }
 
+/**
+ * Minimal description of a per-column transform applied within a CLUSTER BY 
expression.
+ *
+ * @param columnIndex index into [[ClusterByTransform.columnNames]] 
identifying the column
+ *                    being transformed.
+ * @param argumentIndex the index in the argument list where the bound 
clustering column
+ *                      should be substituted. Zero-indexed, and any arguments 
at or after
+ *                      this index in `arguments` should be shifted to the 
right by one.
+ * @param function    canonical SQL function name (e.g. "variant_get").
+ * @param arguments   the non-column literal arguments to the function.
+ */
+case class ClusterByColumnTransform(
+    columnIndex: Int,
+    argumentIndex: Int,
+    function: String,
+    arguments: Seq[LiteralValue[_]])

Review Comment:
   Both `ClusterByColumnTransform` (this abstract descriptor) and 
`ClusteringColumnTransform` (the concrete Transform added further below) can be 
removed if you reuse `ApplyTransform` and store per-column transforms as the 
`arguments` of `ClusterByTransform` itself — `cluster_by(c1, upper(c2), 
bucket(4, c3))`. That gives correct `describe()` / `toString` / `arguments()` 
for free, fixes the DESCRIBE / SHOW CREATE TABLE round-trip, and matches 
`PARTITIONED BY`'s wire shape. The `argumentIndex` field here is also dead 
state today because `fromExpressions` forces the column reference to position 
0. See body summary, pillar 1.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableClusterBySuiteBase.scala:
##########
@@ -43,6 +44,17 @@ trait AlterTableClusterBySuiteBase extends QueryTest with 
DDLCommandTestUtils {
 
   def validateClusterBy(tableName: String, clusteringColumns: Seq[String]): 
Unit
 
+  /**
+   * Validates clustering columns and their associated transforms.
+   * @param expectedTransforms per-column transforms, where None means a plain 
column reference
+   *                           and Some(transform) means an expression-based 
clustering column.
+   *                           Must have the same length as clusteringColumns.
+   */
+  def validateClusterBy(
+      tableName: String,
+      clusteringColumns: Seq[String],
+      expectedTransforms: Seq[Option[Transform]]): Unit

Review Comment:
   This new abstract `validateClusterBy(... expectedTransforms)` is implemented 
in all four v1/v2 Create/Alter suites but never called from any test — no test 
issues `CREATE TABLE … CLUSTER BY (func(col))` or the ALTER equivalent and 
verifies the transform survives the parser → spec → catalog → read-back 
round-trip. The only coverage of the new feature is `ClusterBySpecSuite` 
exercising helpers directly. Please add at least one happy-path test per v1/v2 
catalog (and the corresponding ones in `CreateTableClusterBySuiteBase` at 
:47-56).



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -725,6 +725,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE": {
+    "message" : [
+      "CLUSTER BY expression <expressionType> has either no column reference, 
or a column reference in an unsupported argument position."

Review Comment:
   This error is thrown for three distinct conditions in 
`ClusterBySpec.fromExpressions` (`interface.scala:392-407`): (a) no column 
reference, (b) more than one column reference, and (c) the column is not at 
argument position 0. The message describes only (a) and (c) — a `CLUSTER BY 
(concat(c1, c2))` failure surfaces as if it were about argument position. 
Either split into separate error classes or rewrite the message to cover all 
three, and document the position-0 constraint somewhere user-facing.



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -546,8 +546,17 @@ replaceTableHeader
     : (CREATE OR)? REPLACE TABLE identifierReference
     ;
 
+expressionOrMultipartIdentifier
+    : expression
+    | multipartIdentifier
+    ;

Review Comment:
   Consider reusing the existing `transform` rule from line 1251 (used by 
`PARTITIONED BY`) instead of inventing `expressionOrMultipartIdentifier`. The 
general `expression` rule accepts arbitrary inputs (e.g. `1+1`, `CASE … END`) 
that then fail post-parse with a generic `IllegalStateException` from 
`fromExpressions`; the dedicated `applyTransform: identifier '(' 
transformArgument,... ')'` rule rejects those at parse time with a targeted 
error. See the body summary, pillar 1.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -340,12 +466,17 @@ object ClusterBySpec {
       normalizedColumns.map(_.toString),
       resolver)
 
-    ClusterBySpec(normalizedColumns)
+    ClusterBySpec(normalizedColumns, clusterBySpec.clusteringColumnTransforms)

Review Comment:
   `clusterBySpec.clusteringColumnTransforms` is passed through unchanged — the 
`FieldReference` inside each transform's `arguments` is not normalized against 
the schema, so after normalization the column case/path in `transforms` can 
disagree with the case/path in `normalizedColumns`.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java:
##########
@@ -277,8 +279,10 @@ static TableChange deleteColumn(String[] fieldNames, 
Boolean ifExists) {
    *                          field names.
    * @return a TableChange for this assignment
    */
-  static TableChange clusterBy(NamedReference[] clusteringColumns) {
-    return new ClusterBy(clusteringColumns);
+  static TableChange clusterBy(
+          NamedReference[] clusteringColumns,
+          Optional<Transform>[] transforms) {
+    return new ClusterBy(clusteringColumns, transforms);
   }

Review Comment:
   This replaces the previous `clusterBy(NamedReference[])` factory with a new 
two-arg one — source-incompatible for any external connector that constructs 
`TableChange.ClusterBy` today. The PR description says the change is 
backward-compatible for connectors, but that's only true for receivers of the 
TableChange. Please add a backward-compatible single-arg overload (delegating 
with an empty `Optional<Transform>[]`).
   
   Also, the Javadoc above only documents `@param clusteringColumns`; please 
add `@param transforms`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to