peter-toth commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3294396721


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleParameters.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Container for reducible function literal parameters.
+ * Provides type-safe access to parameters of various types.
+ *
+ * Examples:
+ * <ul>
+ *   <li>bucket(4, col) → ReducibleParameters([4])</li>
+ *   <li>truncate(col, 3) → ReducibleParameters([3])</li>
+ *   <li>range_bucket(col, 0L, 100L, 10) → ReducibleParameters([0L, 100L, 
10])</li>
+ *   <li>custom_transform(col, "param") → ReducibleParameters(["param"])</li>
+ * </ul>
+ *
+ * @since 5.0.0
+ */
+@Evolving
+public class ReducibleParameters {
+    public static final ReducibleParameters EMPTY = new ReducibleParameters();
+
+    private final List<Object> values;
+
+    private ReducibleParameters() {
+        this.values = new ArrayList<>();
+    }
+
+    public ReducibleParameters(List<Object> values) {
+        this.values = values;
+    }
+
+    public ReducibleParameters(Object... values) {
+        this.values = Arrays.asList(values);
+    }
+
+    /**
+     * Get the number of parameters.
+     */
+    public int count() {
+        return values.size();
+    }
+
+    /**
+     * Check if this container has parameters.
+     */
+    public boolean isEmpty() {
+        return values.isEmpty();
+    }
+
+    /**
+     * Get parameter at index as Integer.
+     * @throws ClassCastException if parameter is not an Integer
+     * @throws IndexOutOfBoundsException if index is invalid
+     */
+    public int getInt(int index) {
+        return (Integer) values.get(index);
+    }
+
+    /**
+     * Get parameter at index as Long.
+     * @throws ClassCastException if parameter is not a Long
+     * @throws IndexOutOfBoundsException if index is invalid
+     */
+    public long getLong(int index) {
+        return (Long) values.get(index);
+    }
+
+    /**
+     * Get parameter at index as String.
+     * @throws ClassCastException if parameter is not a String
+     * @throws IndexOutOfBoundsException if index is invalid
+     */
+    public String getString(int index) {
+        return (String) values.get(index);
+    }
+
+    /**
+     * Get parameter at index as Double.
+     * @throws ClassCastException if parameter is not a Double
+     * @throws IndexOutOfBoundsException if index is invalid
+     */
+    public double getDouble(int index) {
+        return (Double) values.get(index);
+    }
+
+    /**
+     * Get parameter at index as Float.
+     * @throws ClassCastException if parameter is not a Float
+     * @throws IndexOutOfBoundsException if index is invalid
+     */
+    public float getFloat(int index) {
+        return (Float) values.get(index);
+    }

Review Comment:
   `extractParameters` in `TransformExpression.scala:171` converts `Decimal → 
java.math.BigDecimal` on the way in, but this class exposes no `getBigDecimal` 
accessor. A connector with a decimal parameter therefore has to fall back to 
the untyped `Object get(int index)` and cast by hand — exactly the type-safety 
the wrapper was introduced to provide. Either add a `getBigDecimal` getter 
alongside the others, or remove the Decimal special-case in `extractParameters` 
and document the supported types explicitly. Concretely:
   
   ```java
   /**
    * Get parameter at index as BigDecimal.
    * @throws ClassCastException if parameter is not a BigDecimal
    * @throws IndexOutOfBoundsException if index is invalid
    */
   public java.math.BigDecimal getBigDecimal(int index) {
       return (java.math.BigDecimal) values.get(index);
   }
   ```
   
   Same gap exists for any other type the Spark side might convert (binary, 
interval, etc.) — see the `## General` note about driving `extractParameters` 
off `DataType`. Inline #6 proposes a structural alternative that would make 
this whole class of issue moot.
   



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleParameters.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Container for reducible function literal parameters.
+ * Provides type-safe access to parameters of various types.
+ *
+ * Examples:
+ * <ul>
+ *   <li>bucket(4, col) → ReducibleParameters([4])</li>
+ *   <li>truncate(col, 3) → ReducibleParameters([3])</li>
+ *   <li>range_bucket(col, 0L, 100L, 10) → ReducibleParameters([0L, 100L, 
10])</li>
+ *   <li>custom_transform(col, "param") → ReducibleParameters(["param"])</li>
+ * </ul>
+ *
+ * @since 5.0.0
+ */
+@Evolving
+public class ReducibleParameters {

Review Comment:
   **Structural alternative — reuse `V2Literal` instead of introducing 
`ReducibleParameters`.** Surfacing this even though it's late in the cycle, 
because it's the kind of design choice worth weighing before locking in a new 
public class.
   
   The proposal: drop `ReducibleParameters` entirely and have the generalized 
reducer take `org.apache.spark.sql.connector.expressions.Literal` (the V2 
literal type already used everywhere else in the connector API):
   
   ```java
   default Reducer<I, O> reducer(
       org.apache.spark.sql.connector.expressions.Literal<?>[] thisParams,
       ReducibleFunction<?, ?> otherFunction,
       org.apache.spark.sql.connector.expressions.Literal<?>[] otherParams) {
     throw new UnsupportedOperationException();
   }
   ```
   
   Connector use:
   ```java
   @Override
   public Reducer<UTF8String, UTF8String> reducer(
       Literal<?>[] thisParams,
       ReducibleFunction<?, ?> otherFunc,
       Literal<?>[] otherParams) {
     if (otherFunc != TruncateFunction) return null;
     int thisWidth = (Integer) thisParams[0].value();
     int otherWidth = (Integer) otherParams[0].value();
     ...
   }
   ```
   
   **What this fixes simultaneously:**
   
   1. **Inline #1's gap evaporates.** No typed getters to maintain — connectors 
call `.value()` and dispatch on `.dataType()`. No more "we forgot 
`getBigDecimal`" / "we'll need `getCalendarInterval`" / etc.
   2. **The `extractParameters` partial-conversion concern (General-section 
bullet #2) goes away.** `V2Literal` carries `dataType()` alongside the value, 
so the connector interprets it correctly regardless of whether it's `String`, 
`BigDecimal`, `byte[]`, `CalendarInterval`, etc. No Catalyst-internal types 
leaking, no type-by-type special-casing — `extractParameters` shrinks to a 
Catalyst-Literal → V2-Literal conversion (essentially the inverse of what 
`V2ExpressionUtils.toCatalyst` already does for V2 literals).
   3. **One fewer public class to learn / maintain / stabilize.** `V2Literal` 
is already public, already `@Evolving` → stable, already used by every 
connector that authors V2 transforms (`Expressions.literal(N)`, 
`Expressions.bucket(N, col)`). Receiving them back through the reducer API is 
symmetric round-trip with how transforms are constructed — V2 connectors never 
have to cross the Catalyst boundary in this public surface.
   
   **Trade-offs being honest about:**
   
   - Slightly more verbose at the connector use site (`(Integer) 
params[0].value()` vs `params.getInt(0)`). Real but small.
   - Java generics + arrays awkwardness; `Literal<?>[]` produces 
unchecked-warning ceremony. `List<Literal<?>>` or `Literal<?>...` varargs are 
cleaner alternatives.
   - The deprecated int-API → new-API dispatch shim still needed (single int → 
`Literal<Integer>`).
   
   cc @sunchao @szehon-ho — would value your read on this trade-off, given your 
existing reviews of `ReducibleParameters`. Should the API rebase on 
`V2Literal`, or stay with the new class as drafted?
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -17,46 +17,103 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.FUNCTION_NAME
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ScalarFunction}
+import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ReducibleParameters, ScalarFunction}
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Represents a partition transform expression, for instance, `bucket`, 
`days`, `years`, etc.
  *
  * @param function the transform function itself. Spark will use it to decide 
whether two
  *                 partition transform expressions are compatible.
- * @param numBucketsOpt the number of buckets if the transform is `bucket`. 
Unset otherwise.
  */
-case class TransformExpression(
-    function: BoundFunction,
-    children: Seq[Expression],
-    numBucketsOpt: Option[Int] = None) extends Expression {
+case class TransformExpression(function: BoundFunction, children: 
Seq[Expression])
+    extends Expression with Logging {
 
   override def nullable: Boolean = true
 
   /**
-   * Whether this [[TransformExpression]] has the same semantics as `other`.
-   * For instance, `bucket(32, c)` is equal to `bucket(32, d)`, but not to 
`bucket(16, d)` or
-   * `year(c)`.
+   * Extract literal children (constant parameters) from this transform. These 
are constant
+   * arguments like width in truncate(col, width). Literals are compared when 
checking if two
+   * transforms are the same.
+   */
+  private lazy val literalChildren: Seq[Literal] =
+    children.collect { case l: Literal => l }
+
+  /**
+   * Whether this [[TransformExpression]] has the same semantics as `other`. 
For instance,
+   * `bucket(32, c)` is equal to `bucket(32, d)`, but not to `bucket(16, d)` 
or `year(c)`.
+   * Similarly, `truncate(c, 2)` is equal to `truncate(d, 2)`, but may not to 
`truncate(c, 4)`.
    *
    * This will be used, for instance, by Spark to determine whether 
storage-partitioned join can
    * be triggered, by comparing partition transforms from both sides of the 
join and checking
    * whether they are compatible.
    *
-   * @param other the transform expression to compare to
-   * @return true if this and `other` has the same semantics w.r.t to 
transform, false otherwise.
+   * Two transforms are considered the same if:
+   *   1. They have the same function name
+   *   2. They have the same literal arguments (e.g., numBuckets for bucket, 
width for truncate)
+   *
+   * @param other
+   *   the transform expression to compare to
+   * @return
+   *   true if this and `other` has the same semantics w.r.t to transform, 
false otherwise.
    */
   def isSameFunction(other: TransformExpression): Boolean = other match {
-    case TransformExpression(otherFunction, _, otherNumBucketsOpt) =>
-      function.canonicalName() == otherFunction.canonicalName() &&
-        numBucketsOpt == otherNumBucketsOpt
+    case TransformExpression(otherFunction, _) =>
+      val sameFunctionName = function.canonicalName() == 
otherFunction.canonicalName()
+
+      // Compare literal arguments to ensure transforms with different 
parameters
+      // (e.g., bucket(32, col) vs bucket(16, col), truncate(col, 2) vs 
truncate(col, 4))
+      // are not considered the same
+      val otherLiterals = other.literalChildren
+      val sameLiterals = literalChildren.length == otherLiterals.length &&
+        literalChildren.zip(otherLiterals).forall { case (l1, l2) =>
+          l1.equals(l2)
+        }
+
+      sameFunctionName && sameLiterals
     case _ =>
       false
   }
 
+  /**
+   * Override canonicalized to ensure transforms with the same function and 
literals are
+   * considered semantically equal, regardless of which specific column 
references they use.
+   *
+   * This is crucial for Storage Partitioned Joins - we need bucket(4, 
tableA.id) and bucket(4,
+   * tableB.id) to be semantically equal so SPJ can be triggered.
+   */
+  override lazy val canonicalized: Expression = {

Review Comment:
   The doc claim that motivates this override doesn't hold, and the override 
doesn't add anything over the inherited default. Two pieces:
   
   **The motivation doesn't hold.** The doc says "we need bucket(4, tableA.id) 
and bucket(4, tableB.id) to be semantically equal so SPJ can be triggered". SPJ 
doesn't use `canonicalized`/`semanticEquals` on `TransformExpression` though — 
it dispatches through `isSameFunction`/`isCompatible` (see 
`KeyedShuffleSpec.isExpressionCompatible` at `partitioning.scala:1082`). And 
even if it did, this override wouldn't deliver the claim: 
`AttributeReference.canonicalized` (`namedExpressions.scala:324`) normalizes 
the *name* to `"none"` but **keeps the original `exprId`**, so two different 
attributes with the same name still canonicalize to different forms. `bucket(4, 
tableA.id)` and `bucket(4, tableB.id)` aren't made equal by either the override 
or the default.
   
   **The body is equivalent to the default.** `Literal` is a `LeafExpression`, 
so `Literal.canonicalized` returns the literal unchanged — the per-child `case 
l: Literal => l` branch produces the same result as `l.canonicalized`. The 
override therefore matches `withCanonicalizedChildren` exactly, except it skips 
the `Canonicalize.execute` post-pass that the default routes through (a no-op 
for `TransformExpression` today, but a future change to `Canonicalize` would 
silently fail to apply here).
   
   Recommended: remove the override entirely.
   



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala:
##########
@@ -253,20 +287,88 @@ object StringSelfFunction extends 
ScalarFunction[UTF8String] {
 }
 
 object UnboundTruncateFunction extends UnboundFunction {
-  override def bind(inputType: StructType): BoundFunction = TruncateFunction
+  override def bind(inputType: StructType): BoundFunction = {
+    if (inputType.size == 2) {
+      inputType.head.dataType match {
+        case StringType | BinaryType => TruncateFunction

Review Comment:
   This dispatch claims `BinaryType` and `LongType` support, but neither branch 
actually works:
   
   - **`StringType | BinaryType => TruncateFunction`** — 
`TruncateFunction.inputTypes()` declares `(StringType, IntegerType)` and 
`produceResult` calls `input.getUTF8String(0)`. A `BinaryType` row (raw 
`byte[]` underneath) mismatches `inputTypes()` at bind-time, or — if the 
framework lets it through — fails at runtime on the `getUTF8String` read.
   - **`IntegerType | LongType => IntegerTruncateFunction`** — 
`IntegerTruncateFunction.inputTypes()` declares `(IntegerType, IntegerType)` 
and `produceResult` calls `input.getInt(0)`. Same kind of mismatch on a 
`LongType` row, plus its `resultType()` is `IntegerType` so even if you cast 
input down, the column type is wrong.
   
   None of the phantom branches are exercised by tests in this PR — every 
truncate test uses `StringType` and dispatches to `TruncateFunction` correctly. 
So the broken branches add API surface that connector authors might trust 
without coverage to defend it.
   
   Adjacent issue: the doc on `IntegerTruncateFunction` ("different integer 
truncate widths produce incompatible partition structures") is wrong: 
`truncate(v, W1)` is reducible to `truncate(v, W2)` whenever `W2 % W1 == 0` 
(snap to a coarser width), and when neither divides the other both reduce to 
`truncate(_, lcm(W1, W2))`. Same structural argument as bucket's GCD case.
   
   Three options, in order of cleanliness:
   1. Drop the unused branches and `IntegerTruncateFunction` from this PR — 
only `StringType` truncate is part of the SPJ story being delivered.
   2. Split per type (`IntegerTruncate` / `LongTruncate` / `BinaryTruncate`) 
with the right `inputTypes()`/`resultType()`/`produceResult` and add at least 
one test per branch.
   3. Make each existing object polymorphic on the input type (declare wider 
`inputTypes()`, dispatch in `produceResult`) and add tests.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -17,46 +17,103 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.FUNCTION_NAME
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ScalarFunction}
+import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ReducibleParameters, ScalarFunction}
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Represents a partition transform expression, for instance, `bucket`, 
`days`, `years`, etc.
  *
  * @param function the transform function itself. Spark will use it to decide 
whether two
  *                 partition transform expressions are compatible.
- * @param numBucketsOpt the number of buckets if the transform is `bucket`. 
Unset otherwise.
  */
-case class TransformExpression(
-    function: BoundFunction,
-    children: Seq[Expression],
-    numBucketsOpt: Option[Int] = None) extends Expression {
+case class TransformExpression(function: BoundFunction, children: 
Seq[Expression])
+    extends Expression with Logging {
 
   override def nullable: Boolean = true
 
   /**
-   * Whether this [[TransformExpression]] has the same semantics as `other`.
-   * For instance, `bucket(32, c)` is equal to `bucket(32, d)`, but not to 
`bucket(16, d)` or
-   * `year(c)`.
+   * Extract literal children (constant parameters) from this transform. These 
are constant
+   * arguments like width in truncate(col, width). Literals are compared when 
checking if two
+   * transforms are the same.
+   */
+  private lazy val literalChildren: Seq[Literal] =
+    children.collect { case l: Literal => l }
+
+  /**
+   * Whether this [[TransformExpression]] has the same semantics as `other`. 
For instance,
+   * `bucket(32, c)` is equal to `bucket(32, d)`, but not to `bucket(16, d)` 
or `year(c)`.
+   * Similarly, `truncate(c, 2)` is equal to `truncate(d, 2)`, but may not to 
`truncate(c, 4)`.
    *
    * This will be used, for instance, by Spark to determine whether 
storage-partitioned join can
    * be triggered, by comparing partition transforms from both sides of the 
join and checking
    * whether they are compatible.
    *
-   * @param other the transform expression to compare to
-   * @return true if this and `other` has the same semantics w.r.t to 
transform, false otherwise.
+   * Two transforms are considered the same if:
+   *   1. They have the same function name
+   *   2. They have the same literal arguments (e.g., numBuckets for bucket, 
width for truncate)
+   *
+   * @param other
+   *   the transform expression to compare to
+   * @return
+   *   true if this and `other` has the same semantics w.r.t to transform, 
false otherwise.
    */
   def isSameFunction(other: TransformExpression): Boolean = other match {
-    case TransformExpression(otherFunction, _, otherNumBucketsOpt) =>
-      function.canonicalName() == otherFunction.canonicalName() &&
-        numBucketsOpt == otherNumBucketsOpt
+    case TransformExpression(otherFunction, _) =>
+      val sameFunctionName = function.canonicalName() == 
otherFunction.canonicalName()
+
+      // Compare literal arguments to ensure transforms with different 
parameters
+      // (e.g., bucket(32, col) vs bucket(16, col), truncate(col, 2) vs 
truncate(col, 4))
+      // are not considered the same
+      val otherLiterals = other.literalChildren
+      val sameLiterals = literalChildren.length == otherLiterals.length &&
+        literalChildren.zip(otherLiterals).forall { case (l1, l2) =>
+          l1.equals(l2)
+        }
+
+      sameFunctionName && sameLiterals
     case _ =>
       false
   }
 
+  /**
+   * Override canonicalized to ensure transforms with the same function and 
literals are
+   * considered semantically equal, regardless of which specific column 
references they use.
+   *
+   * This is crucial for Storage Partitioned Joins - we need bucket(4, 
tableA.id) and bucket(4,
+   * tableB.id) to be semantically equal so SPJ can be triggered.
+   */
+  override lazy val canonicalized: Expression = {
+    // Canonicalize only the non-literal children (i.e., column references)
+    val canonicalizedReferenceChildren = children.map {
+      case l: Literal => l
+      case other => other.canonicalized
+    }
+    TransformExpression(function, canonicalizedReferenceChildren)
+  }
+
+  /**
+   * Override collectLeaves to only return reference children (columns), not 
literal parameters.
+   *
+   * For TransformExpression, literal children are metadata about the 
transform function (e.g.,
+   * numBuckets=4 in bucket(4, col), width=2 in truncate(col, 2)). All 
consumers of
+   * collectLeaves() expect only column references, not these metadata 
literals.
+   *
+   */
+  override def collectLeaves(): Seq[Expression] = {
+    children.flatMap {

Review Comment:
   This override breaks the universal `TreeNode.collectLeaves` contract — the 
base method (`TreeNode.scala:315`) returns "every node in the tree where 
`children.isEmpty`", but this returns column references only, hiding the 
literal leaves. Functionally correct for current SPJ call sites 
(`partitioning.scala:492/498/982/1037`, 
`EnsureRequirements.scala:89/412/417/780`), but a quiet semantic divergence — 
any future caller of `collectLeaves` on a `TransformExpression` will silently 
get wrong answers.
   
   The cleaner shape is to migrate the SPJ call sites off `_.collectLeaves()` 
and use the right helper at each:
   - `Expression.references` where set semantics suffice (existence/size 
checks, single-attribute lookup).
   - A new `Expression.collectAttributes(): Seq[Attribute]` where a positional 
`Seq` is required (`RowOrdering.create`, `reorder`, 
`attributes.zip(clustering)`) — `AttributeSet` deduplicates and its ordering is 
`LinkedHashSet`-implementation-detail rather than contract.
   
   I've sent #56088 as the follow-up doing exactly that. Pure refactor against 
today's `master` (current `TransformExpression` has no literal children, so the 
migrated call sites produce identical results). Once it's merged, this override 
can be dropped on rebase, and 55885's representation change (literals as inline 
children) lands cleanly through the already-migrated call sites.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -17,46 +17,103 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.FUNCTION_NAME
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ScalarFunction}
+import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
Reducer, ReducibleFunction, ReducibleParameters, ScalarFunction}
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Represents a partition transform expression, for instance, `bucket`, 
`days`, `years`, etc.
  *
  * @param function the transform function itself. Spark will use it to decide 
whether two
  *                 partition transform expressions are compatible.
- * @param numBucketsOpt the number of buckets if the transform is `bucket`. 
Unset otherwise.
  */
-case class TransformExpression(
-    function: BoundFunction,
-    children: Seq[Expression],
-    numBucketsOpt: Option[Int] = None) extends Expression {
+case class TransformExpression(function: BoundFunction, children: 
Seq[Expression])
+    extends Expression with Logging {
 
   override def nullable: Boolean = true
 
   /**
-   * Whether this [[TransformExpression]] has the same semantics as `other`.
-   * For instance, `bucket(32, c)` is equal to `bucket(32, d)`, but not to 
`bucket(16, d)` or
-   * `year(c)`.
+   * Extract literal children (constant parameters) from this transform. These 
are constant
+   * arguments like width in truncate(col, width). Literals are compared when 
checking if two
+   * transforms are the same.
+   */
+  private lazy val literalChildren: Seq[Literal] =
+    children.collect { case l: Literal => l }
+
+  /**
+   * Whether this [[TransformExpression]] has the same semantics as `other`. 
For instance,
+   * `bucket(32, c)` is equal to `bucket(32, d)`, but not to `bucket(16, d)` 
or `year(c)`.
+   * Similarly, `truncate(c, 2)` is equal to `truncate(d, 2)`, but may not to 
`truncate(c, 4)`.
    *
    * This will be used, for instance, by Spark to determine whether 
storage-partitioned join can
    * be triggered, by comparing partition transforms from both sides of the 
join and checking
    * whether they are compatible.
    *
-   * @param other the transform expression to compare to
-   * @return true if this and `other` has the same semantics w.r.t to 
transform, false otherwise.
+   * Two transforms are considered the same if:
+   *   1. They have the same function name
+   *   2. They have the same literal arguments (e.g., numBuckets for bucket, 
width for truncate)
+   *
+   * @param other
+   *   the transform expression to compare to
+   * @return
+   *   true if this and `other` has the same semantics w.r.t to transform, 
false otherwise.
    */
   def isSameFunction(other: TransformExpression): Boolean = other match {
-    case TransformExpression(otherFunction, _, otherNumBucketsOpt) =>
-      function.canonicalName() == otherFunction.canonicalName() &&
-        numBucketsOpt == otherNumBucketsOpt
+    case TransformExpression(otherFunction, _) =>
+      val sameFunctionName = function.canonicalName() == 
otherFunction.canonicalName()
+
+      // Compare literal arguments to ensure transforms with different 
parameters
+      // (e.g., bucket(32, col) vs bucket(16, col), truncate(col, 2) vs 
truncate(col, 4))
+      // are not considered the same
+      val otherLiterals = other.literalChildren

Review Comment:
   This compares literals by value but ignores positional structure of 
`children`. Two consequences:
   
   1. **Position-agnostic.** Two transforms with the same function and the same 
literal values but different interleavings of literals/non-literals are 
reported as the same. In practice the V2 function catalog pins down argument 
order per function name, so this is unlikely to manifest, but the doc claims 
"same semantics as `other`" — the check is weaker than that without the catalog 
assumption.
   
   2. **Nested-transform children.** `V2ExpressionUtils.toCatalyst` recurses 
through `toCatalystTransformOpt`, so children can include nested 
`TransformExpression`s (sunchao's earlier P1 raised this for 
`supportsExpressions`). `bucket(4, days(col))` and `bucket(4, hours(col))` both 
have outer function `bucket` and `literalChildren = [Literal(4)]`, so this 
returns `true` even though the inner transforms differ. 
`KeyedPartitioning.supportsExpressions` rejects nested-transform shapes from 
SPJ planning, so this doesn't bite SPJ today, but `isSameFunction` is used 
elsewhere (`isCompatible`, suite assertions) where the false positive would 
leak.
   
   A structural per-position walk closes both gaps and lets `literalChildren` 
shrink to just the `extractParameters` helper:
   
   ```scala
   def isSameFunction(other: TransformExpression): Boolean = {
     function.canonicalName() == other.function.canonicalName() &&
       children.length == other.children.length &&
       children.zip(other.children).forall {
         case (a: Literal, b: Literal) => a == b
         case (a: TransformExpression, b: TransformExpression) => 
a.isSameFunction(b)
         case (_: Literal, _) | (_, _: Literal) => false
         case (_: TransformExpression, _) | (_, _: TransformExpression) => false
         case _ => true   // both refs/attrs — column identity intentionally 
ignored
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to