cloud-fan commented on code in PR #55498:
URL: https://github.com/apache/spark/pull/55498#discussion_r3128452749


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala:
##########
@@ -444,263 +444,199 @@ trait GetArrayItemUtil {
 
 /**
  * Common trait for [[GetMapValue]] and [[ElementAt]].
+ *
+ * The lookup strategy is chosen once per expression instance via [[executor]]:
+ *   - [[PrebuiltHashExecutor]] when the map child is foldable, its key type 
supports
+ *     hashing, and its size meets [[SQLConf.MAP_LOOKUP_HASH_THRESHOLD]]. The 
hash index
+ *     is built once from the constant map and reused for every row.
+ *   - [[LinearExecutor]] otherwise (non-foldable maps, unsupported key types, 
or maps
+ *     below the threshold). Hash lookup is not used here because its per-row 
build cost
+ *     is higher than a linear scan when the index cannot be reused across 
rows.
+ *
+ * Each strategy owns both its interpreted eval and its codegen, grouped 
together, and
+ * [[getValueEval]] / [[doGetValueGenCode]] are thin delegators.
  */
 trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
 
-  @transient private var lastMap: MapData = _
-  @transient private var lastIndex: java.util.HashMap[Any, Int] = _
-
-  /**
-   * The threshold to determine whether to use hash lookup for map lookup 
expressions.
-   * If the map size is small, the cost of building hash map exceeds the cost 
of a linear scan.
-   * This is configured by `spark.sql.optimizer.mapLookupHashThreshold`.
-   */
   @transient private lazy val hashLookupThreshold =
     SQLConf.get.getConf(SQLConf.MAP_LOOKUP_HASH_THRESHOLD)
 
-  private def getOrBuildIndex(map: MapData, keyType: DataType): 
java.util.HashMap[Any, Int] = {
-    if (lastMap ne map) {
-      val keys = map.keyArray()
-      val len = keys.numElements()
-      val hm = new java.util.HashMap[Any, Int]((len * 1.5).toInt)
-      var i = 0
-      while (i < len) {
-        val k = keys.get(i, keyType)
-        hm.putIfAbsent(k, i)
-        i += 1
-      }
-      lastIndex = hm
-      lastMap = map
-    }
-    lastIndex
-  }
-
-  def getValueEval(
-      value: Any,
-      ordinal: Any,
-      keyType: DataType,
-      ordering: Ordering[Any]): Any = {
-    val map = value.asInstanceOf[MapData]
-    val length = map.numElements()
+  @transient private lazy val executor: MapLookupExecutor = chooseExecutor()
 
-    if (length < hashLookupThreshold || 
!TypeUtils.typeWithProperEquals(keyType)) {
-      getValueEvalLinear(map, ordinal, keyType, ordering)
-    } else {
-      val idx = getOrBuildIndex(map, keyType).getOrDefault(ordinal, -1)
-      if (idx == -1 || map.valueArray().isNullAt(idx)) {
-        null
-      } else {
-        map.valueArray().get(idx, dataType)
+  private def chooseExecutor(): MapLookupExecutor = left.dataType match {
+    case MapType(keyType, _, _)
+        if left.foldable && TypeUtils.typeWithProperEquals(keyType) =>

Review Comment:
   Good catch, thanks. Keeping the predicate on `typeWithProperEquals` (rather 
than tightening) so the interpreted and codegen paths stay unified on one 
predicate - which is what the refactor is aiming for. Added a block comment on 
`chooseExecutor` in the latest push explaining why: `VariantType` is 
structurally unreachable (`checkForMapKeyType` rejects it), 
`GeographyType`/`GeometryType` have identity-inherited equals but that's 
already the state of SPARK-55959's interpreted hash path (same predicate), and 
non-binary-equality `StringType` / `BinaryType` still fall through to linear. 
Tightening would split the two paths' predicates again, which is the drift 
hazard we're resolving.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala:
##########
@@ -444,263 +444,199 @@ trait GetArrayItemUtil {
 
 /**
  * Common trait for [[GetMapValue]] and [[ElementAt]].
+ *
+ * The lookup strategy is chosen once per expression instance via [[executor]]:
+ *   - [[PrebuiltHashExecutor]] when the map child is foldable, its key type 
supports
+ *     hashing, and its size meets [[SQLConf.MAP_LOOKUP_HASH_THRESHOLD]]. The 
hash index
+ *     is built once from the constant map and reused for every row.
+ *   - [[LinearExecutor]] otherwise (non-foldable maps, unsupported key types, 
or maps
+ *     below the threshold). Hash lookup is not used here because its per-row 
build cost
+ *     is higher than a linear scan when the index cannot be reused across 
rows.
+ *
+ * Each strategy owns both its interpreted eval and its codegen, grouped 
together, and
+ * [[getValueEval]] / [[doGetValueGenCode]] are thin delegators.
  */
 trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
 
-  @transient private var lastMap: MapData = _
-  @transient private var lastIndex: java.util.HashMap[Any, Int] = _
-
-  /**
-   * The threshold to determine whether to use hash lookup for map lookup 
expressions.
-   * If the map size is small, the cost of building hash map exceeds the cost 
of a linear scan.
-   * This is configured by `spark.sql.optimizer.mapLookupHashThreshold`.
-   */
   @transient private lazy val hashLookupThreshold =
     SQLConf.get.getConf(SQLConf.MAP_LOOKUP_HASH_THRESHOLD)
 
-  private def getOrBuildIndex(map: MapData, keyType: DataType): 
java.util.HashMap[Any, Int] = {
-    if (lastMap ne map) {
-      val keys = map.keyArray()
-      val len = keys.numElements()
-      val hm = new java.util.HashMap[Any, Int]((len * 1.5).toInt)
-      var i = 0
-      while (i < len) {
-        val k = keys.get(i, keyType)
-        hm.putIfAbsent(k, i)
-        i += 1
-      }
-      lastIndex = hm
-      lastMap = map
-    }
-    lastIndex
-  }
-
-  def getValueEval(
-      value: Any,
-      ordinal: Any,
-      keyType: DataType,
-      ordering: Ordering[Any]): Any = {
-    val map = value.asInstanceOf[MapData]
-    val length = map.numElements()
+  @transient private lazy val executor: MapLookupExecutor = chooseExecutor()
 
-    if (length < hashLookupThreshold || 
!TypeUtils.typeWithProperEquals(keyType)) {
-      getValueEvalLinear(map, ordinal, keyType, ordering)
-    } else {
-      val idx = getOrBuildIndex(map, keyType).getOrDefault(ordinal, -1)
-      if (idx == -1 || map.valueArray().isNullAt(idx)) {
-        null
-      } else {
-        map.valueArray().get(idx, dataType)
+  private def chooseExecutor(): MapLookupExecutor = left.dataType match {
+    case MapType(keyType, _, _)
+        if left.foldable && TypeUtils.typeWithProperEquals(keyType) =>
+      left.eval(null) match {
+        case map: MapData if map.numElements() >= hashLookupThreshold =>
+          new PrebuiltHashExecutor(buildHashIndex(map, keyType), 
map.valueArray())
+        case _ => LinearExecutor
       }
-    }
+    case _ => LinearExecutor
   }
 
-  private def getValueEvalLinear(
-      map: MapData,
-      ordinal: Any,
-      keyType: DataType,
-      ordering: Ordering[Any]): Any = {
-    val length = map.numElements()
+  private def buildHashIndex(map: MapData, keyType: DataType): 
java.util.HashMap[Any, Int] = {
     val keys = map.keyArray()
-    val values = map.valueArray()
-
+    val len = keys.numElements()
+    val hm = new java.util.HashMap[Any, Int]((len * 1.5).toInt)
     var i = 0
-    var found = false
-    while (i < length && !found) {
-      if (ordering.equiv(keys.get(i, keyType), ordinal)) {
-        found = true
-      } else {
-        i += 1
-      }
-    }
-
-    if (!found || values.isNullAt(i)) {
-      null
-    } else {
-      values.get(i, dataType)
+    while (i < len) {
+      // putIfAbsent preserves first-match semantics for maps with duplicate 
keys (allowed at
+      // the physical level by [[ArrayBasedMapData]]), matching the linear 
scan path.
+      hm.putIfAbsent(keys.get(i, keyType), i)
+      i += 1
     }
+    hm
   }
 
+  def getValueEval(
+      value: Any,
+      ordinal: Any,
+      keyType: DataType,
+      ordering: Ordering[Any]): Any =
+    executor.eval(value.asInstanceOf[MapData], ordinal, keyType, ordering)
+
   def doGetValueGenCode(
       ctx: CodegenContext,
       ev: ExprCode,
-      mapType: MapType): ExprCode = {
-    val keyType = mapType.keyType
-    if (supportsHashLookup(keyType)) {
-      doGetValueGenCodeWithHashOpt(ctx, ev, mapType)
-    } else {
-      doGetValueGenCodeLinear(ctx, ev, mapType)
-    }
-  }
+      mapType: MapType): ExprCode =
+    executor.genCode(ctx, ev, mapType)
 
-  private def supportsHashLookup(keyType: DataType): Boolean = keyType match {
-    case BooleanType | ByteType | ShortType | IntegerType | LongType |
-         FloatType | DoubleType | DateType | TimestampType |
-         TimestampNTZType | _: YearMonthIntervalType |
-         _: DayTimeIntervalType | _: DecimalType | _: TimeType => true
-    case st: StringType if st.supportsBinaryEquality => true
-    case _ => false
+  /**
+   * Execution strategy for a map key lookup. Inner to the trait so each 
concrete strategy
+   * has closure access to `dataType` and `nullSafeCodeGen` on the enclosing 
expression.
+   */
+  protected sealed trait MapLookupExecutor {
+    def eval(map: MapData, ordinal: Any, keyType: DataType, ordering: 
Ordering[Any]): Any
+    def genCode(ctx: CodegenContext, ev: ExprCode, mapType: MapType): ExprCode
   }
 
-  private def doGetValueGenCodeLinear(
-      ctx: CodegenContext,
-      ev: ExprCode,
-      mapType: MapType): ExprCode = {
-    val index = ctx.freshName("index")
-    val length = ctx.freshName("length")
-    val keys = ctx.freshName("keys")
-    val values = ctx.freshName("values")
-    val keyType = mapType.keyType
-
-    val keyJavaType = CodeGenerator.javaType(keyType)
-    val loopKey = ctx.freshName("loopKey")
-    val i = ctx.freshName("i")
-
-    val nullValueCheck = if (mapType.valueContainsNull) {
-      s"""
-         |else if ($values.isNullAt($index)) {
-         |  ${ev.isNull} = true;
-         |}
-       """.stripMargin
-    } else {
-      ""
-    }
+  /** Linear scan over the map's key array. Stateless; no pre-computation. */
+  protected object LinearExecutor extends MapLookupExecutor {

Review Comment:
   Fair, thanks - fixed in the updated PR description ("stateless 
path-dependent `object`" instead of "stateless singleton"). The in-code doc 
already says "Stateless; no pre-computation" so no change there.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala:
##########
@@ -444,263 +444,199 @@ trait GetArrayItemUtil {
 
 /**
  * Common trait for [[GetMapValue]] and [[ElementAt]].
+ *
+ * The lookup strategy is chosen once per expression instance via [[executor]]:
+ *   - [[PrebuiltHashExecutor]] when the map child is foldable, its key type 
supports
+ *     hashing, and its size meets [[SQLConf.MAP_LOOKUP_HASH_THRESHOLD]]. The 
hash index
+ *     is built once from the constant map and reused for every row.
+ *   - [[LinearExecutor]] otherwise (non-foldable maps, unsupported key types, 
or maps
+ *     below the threshold). Hash lookup is not used here because its per-row 
build cost
+ *     is higher than a linear scan when the index cannot be reused across 
rows.
+ *
+ * Each strategy owns both its interpreted eval and its codegen, grouped 
together, and
+ * [[getValueEval]] / [[doGetValueGenCode]] are thin delegators.
  */
 trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
 
-  @transient private var lastMap: MapData = _
-  @transient private var lastIndex: java.util.HashMap[Any, Int] = _
-
-  /**
-   * The threshold to determine whether to use hash lookup for map lookup 
expressions.
-   * If the map size is small, the cost of building hash map exceeds the cost 
of a linear scan.
-   * This is configured by `spark.sql.optimizer.mapLookupHashThreshold`.
-   */
   @transient private lazy val hashLookupThreshold =
     SQLConf.get.getConf(SQLConf.MAP_LOOKUP_HASH_THRESHOLD)
 
-  private def getOrBuildIndex(map: MapData, keyType: DataType): 
java.util.HashMap[Any, Int] = {
-    if (lastMap ne map) {
-      val keys = map.keyArray()
-      val len = keys.numElements()
-      val hm = new java.util.HashMap[Any, Int]((len * 1.5).toInt)
-      var i = 0
-      while (i < len) {
-        val k = keys.get(i, keyType)
-        hm.putIfAbsent(k, i)
-        i += 1
-      }
-      lastIndex = hm
-      lastMap = map
-    }
-    lastIndex
-  }
-
-  def getValueEval(
-      value: Any,
-      ordinal: Any,
-      keyType: DataType,
-      ordering: Ordering[Any]): Any = {
-    val map = value.asInstanceOf[MapData]
-    val length = map.numElements()
+  @transient private lazy val executor: MapLookupExecutor = chooseExecutor()
 
-    if (length < hashLookupThreshold || 
!TypeUtils.typeWithProperEquals(keyType)) {
-      getValueEvalLinear(map, ordinal, keyType, ordering)
-    } else {
-      val idx = getOrBuildIndex(map, keyType).getOrDefault(ordinal, -1)
-      if (idx == -1 || map.valueArray().isNullAt(idx)) {
-        null
-      } else {
-        map.valueArray().get(idx, dataType)
+  private def chooseExecutor(): MapLookupExecutor = left.dataType match {
+    case MapType(keyType, _, _)
+        if left.foldable && TypeUtils.typeWithProperEquals(keyType) =>
+      left.eval(null) match {
+        case map: MapData if map.numElements() >= hashLookupThreshold =>
+          new PrebuiltHashExecutor(buildHashIndex(map, keyType), 
map.valueArray())
+        case _ => LinearExecutor
       }
-    }
+    case _ => LinearExecutor
   }
 
-  private def getValueEvalLinear(
-      map: MapData,
-      ordinal: Any,
-      keyType: DataType,
-      ordering: Ordering[Any]): Any = {
-    val length = map.numElements()
+  private def buildHashIndex(map: MapData, keyType: DataType): 
java.util.HashMap[Any, Int] = {
     val keys = map.keyArray()
-    val values = map.valueArray()
-
+    val len = keys.numElements()
+    val hm = new java.util.HashMap[Any, Int]((len * 1.5).toInt)
     var i = 0
-    var found = false
-    while (i < length && !found) {
-      if (ordering.equiv(keys.get(i, keyType), ordinal)) {
-        found = true
-      } else {
-        i += 1
-      }
-    }
-
-    if (!found || values.isNullAt(i)) {
-      null
-    } else {
-      values.get(i, dataType)
+    while (i < len) {
+      // putIfAbsent preserves first-match semantics for maps with duplicate 
keys (allowed at
+      // the physical level by [[ArrayBasedMapData]]), matching the linear 
scan path.
+      hm.putIfAbsent(keys.get(i, keyType), i)
+      i += 1
     }
+    hm
   }
 
+  def getValueEval(
+      value: Any,
+      ordinal: Any,
+      keyType: DataType,
+      ordering: Ordering[Any]): Any =
+    executor.eval(value.asInstanceOf[MapData], ordinal, keyType, ordering)
+
   def doGetValueGenCode(
       ctx: CodegenContext,
       ev: ExprCode,
-      mapType: MapType): ExprCode = {
-    val keyType = mapType.keyType
-    if (supportsHashLookup(keyType)) {
-      doGetValueGenCodeWithHashOpt(ctx, ev, mapType)
-    } else {
-      doGetValueGenCodeLinear(ctx, ev, mapType)
-    }
-  }
+      mapType: MapType): ExprCode =
+    executor.genCode(ctx, ev, mapType)
 
-  private def supportsHashLookup(keyType: DataType): Boolean = keyType match {
-    case BooleanType | ByteType | ShortType | IntegerType | LongType |
-         FloatType | DoubleType | DateType | TimestampType |
-         TimestampNTZType | _: YearMonthIntervalType |
-         _: DayTimeIntervalType | _: DecimalType | _: TimeType => true
-    case st: StringType if st.supportsBinaryEquality => true
-    case _ => false
+  /**
+   * Execution strategy for a map key lookup. Inner to the trait so each 
concrete strategy
+   * has closure access to `dataType` and `nullSafeCodeGen` on the enclosing 
expression.
+   */
+  protected sealed trait MapLookupExecutor {
+    def eval(map: MapData, ordinal: Any, keyType: DataType, ordering: 
Ordering[Any]): Any
+    def genCode(ctx: CodegenContext, ev: ExprCode, mapType: MapType): ExprCode
   }
 
-  private def doGetValueGenCodeLinear(
-      ctx: CodegenContext,
-      ev: ExprCode,
-      mapType: MapType): ExprCode = {
-    val index = ctx.freshName("index")
-    val length = ctx.freshName("length")
-    val keys = ctx.freshName("keys")
-    val values = ctx.freshName("values")
-    val keyType = mapType.keyType
-
-    val keyJavaType = CodeGenerator.javaType(keyType)
-    val loopKey = ctx.freshName("loopKey")
-    val i = ctx.freshName("i")
-
-    val nullValueCheck = if (mapType.valueContainsNull) {
-      s"""
-         |else if ($values.isNullAt($index)) {
-         |  ${ev.isNull} = true;
-         |}
-       """.stripMargin
-    } else {
-      ""
-    }
+  /** Linear scan over the map's key array. Stateless; no pre-computation. */
+  protected object LinearExecutor extends MapLookupExecutor {
+    override def eval(
+        map: MapData,
+        ordinal: Any,
+        keyType: DataType,
+        ordering: Ordering[Any]): Any = {
+      val length = map.numElements()
+      val keys = map.keyArray()
+      val values = map.valueArray()
 
-    nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
-      s"""
-         |final int $length = $eval1.numElements();
-         |final ArrayData $keys = $eval1.keyArray();
-         |final ArrayData $values = $eval1.valueArray();
-         |int $index = -1;
-         |
-         |for (int $i = 0; $i < $length; $i++) {
-         |  $keyJavaType $loopKey = ${CodeGenerator.getValue(keys, keyType, 
i)};
-         |  if (${ctx.genEqual(keyType, loopKey, eval2)}) {
-         |    $index = $i;
-         |    break;
-         |  }
-         |}
-         |
-         |if ($index < 0) {
-         |  ${ev.isNull} = true;
-         |} $nullValueCheck else {
-         |  ${ev.value} = ${CodeGenerator.getValue(values, dataType, index)};
-         |}
-       """.stripMargin
-    })
-  }
+      var i = 0
+      var found = false
+      while (i < length && !found) {
+        if (ordering.equiv(keys.get(i, keyType), ordinal)) {
+          found = true
+        } else {
+          i += 1
+        }
+      }
 
-  /**
-   * Generates code for map lookups.
-   * If the map size is small (less than HASH_LOOKUP_THRESHOLD), it uses a 
linear scan.
-   * If the map size is large, it builds a hash index for O(1) lookup.
-   */
-  private def doGetValueGenCodeWithHashOpt(
-      ctx: CodegenContext,
-      ev: ExprCode,
-      mapType: MapType): ExprCode = {
-    val index = ctx.freshName("index")
-    val length = ctx.freshName("length")
-    val keys = ctx.freshName("keys")
-    val values = ctx.freshName("values")
-    val keyType = mapType.keyType
-
-    val nullValueCheck = if (mapType.valueContainsNull) {
-      s"""
-         |else if ($values.isNullAt($index)) {
-         |  ${ev.isNull} = true;
-         |}
-       """.stripMargin
-    } else {
-      ""
+      if (!found || values.isNullAt(i)) null else values.get(i, dataType)
     }
 
-    val keyJavaType = CodeGenerator.javaType(keyType)
-    val lastKeyArray = ctx.addMutableState("ArrayData", "lastKeyArray", v => 
s"$v = null;")
-    val hashBuckets = ctx.addMutableState("int[]", "hashBuckets", v => s"$v = 
null;")
-    val hashMask = ctx.addMutableState("int", "hashMask", v => s"$v = 0;")
-
-    def genHash(v: String): String = keyType match {
-      case BooleanType => s"($v ? 1 : 0)"
-      case ByteType | ShortType | IntegerType | DateType | _: 
YearMonthIntervalType => s"$v"
-      case LongType | TimestampType | TimestampNTZType | _: 
DayTimeIntervalType | _: TimeType =>
-        s"(int)($v ^ ($v >>> 32))"
-      case FloatType => s"Float.floatToIntBits($v)"
-      case DoubleType =>
-        s"(int)(Double.doubleToLongBits($v) ^ (Double.doubleToLongBits($v) >>> 
32))"
-      case _ => s"$v.hashCode()"
-    }
+    override def genCode(
+        ctx: CodegenContext,
+        ev: ExprCode,
+        mapType: MapType): ExprCode = {
+      val index = ctx.freshName("index")
+      val length = ctx.freshName("length")
+      val keys = ctx.freshName("keys")
+      val values = ctx.freshName("values")
+      val keyType = mapType.keyType
 
-    nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
-      val i = ctx.freshName("i")
-      val h = ctx.freshName("h")
-      val cap = ctx.freshName("cap")
-      val idx = ctx.freshName("idx")
-      val candidate = ctx.freshName("candidate")
+      val keyJavaType = CodeGenerator.javaType(keyType)
       val loopKey = ctx.freshName("loopKey")
+      val i = ctx.freshName("i")
 
-      val buildIndex =
+      val nullValueCheck = if (mapType.valueContainsNull) {
         s"""
-           |int $cap = Math.max(Integer.highestOneBit(Math.max($length * 2 - 
1, 1)) << 1, 4);
-           |if ($hashBuckets == null || $hashBuckets.length < $cap) {
-           |  $hashBuckets = new int[$cap];
+           |else if ($values.isNullAt($index)) {
+           |  ${ev.isNull} = true;
            |}
-           |java.util.Arrays.fill($hashBuckets, 0, $cap, -1);
-           |$hashMask = $cap - 1;
-           |for (int $i = 0; $i < $length; $i++) {
-           |  $keyJavaType $loopKey = ${CodeGenerator.getValue(keys, keyType, 
i)};
-           |  int $h = (${genHash(loopKey)}) & $hashMask;
-           |  while ($hashBuckets[$h] != -1) {
-           |    $h = ($h + 1) & $hashMask;
-           |  }
-           |  $hashBuckets[$h] = $i;
-           |}
-           |$lastKeyArray = $keys;
          """.stripMargin
+      } else {
+        ""
+      }
 
-      val lookup =
+      nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
         s"""
-           |int $h = (${genHash(eval2)}) & $hashMask;
-           |$index = -1;
-           |while ($hashBuckets[$h] != -1) {
-           |  int $idx = $hashBuckets[$h];
-           |  $keyJavaType $candidate = ${CodeGenerator.getValue(keys, 
keyType, idx)};
-           |  if (${ctx.genEqual(keyType, candidate, eval2)}) {
-           |    $index = $idx;
+           |final int $length = $eval1.numElements();
+           |final ArrayData $keys = $eval1.keyArray();
+           |final ArrayData $values = $eval1.valueArray();
+           |int $index = -1;
+           |
+           |for (int $i = 0; $i < $length; $i++) {
+           |  $keyJavaType $loopKey = ${CodeGenerator.getValue(keys, keyType, 
i)};
+           |  if (${ctx.genEqual(keyType, loopKey, eval2)}) {
+           |    $index = $i;
            |    break;
            |  }
-           |  $h = ($h + 1) & $hashMask;
+           |}
+           |
+           |if ($index < 0) {
+           |  ${ev.isNull} = true;
+           |} $nullValueCheck else {
+           |  ${ev.value} = ${CodeGenerator.getValue(values, dataType, index)};
            |}
          """.stripMargin
+      })
+    }
+  }
 
-      s"""
-        final int $length = $eval1.numElements();
-        final ArrayData $keys = $eval1.keyArray();
-        final ArrayData $values = $eval1.valueArray();
-        int $index = -1;
-
-        if ($length >= $hashLookupThreshold) {
-          if ($keys != $lastKeyArray) {
-            $buildIndex
-          }
-          $lookup
-        } else {
-          for (int $i = 0; $i < $length; $i++) {
-            $keyJavaType $loopKey = ${CodeGenerator.getValue(keys, keyType, 
i)};
-            if (${ctx.genEqual(keyType, loopKey, eval2)}) {
-              $index = $i;
-              break;
-            }
-          }
-        }
+  /**
+   * Hash-based lookup for foldable (constant) maps. The [[java.util.HashMap]] 
and the
+   * value array are evaluated on the driver at construction time and reused 
for every row.
+   * The codegen embeds both via [[CodegenContext.addReferenceObj]]; the 
generated per-row
+   * code is a single boxed `HashMap.get` + null check + value read. Primitive 
keys pay one
+   * autobox per row, which is negligible relative to the O(n) linear scan it 
replaces.
+   */
+  protected final class PrebuiltHashExecutor(
+      index: java.util.HashMap[Any, Int],
+      values: ArrayData) extends MapLookupExecutor {
+
+    override def eval(
+        map: MapData,
+        ordinal: Any,
+        keyType: DataType,
+        ordering: Ordering[Any]): Any = {
+      val idx = index.getOrDefault(ordinal, -1)
+      if (idx == -1 || values.isNullAt(idx)) null else values.get(idx, 
dataType)
+    }
 
-        if ($index < 0) {
-          ${ev.isNull} = true;
-        } $nullValueCheck else {
-          ${ev.value} = ${CodeGenerator.getValue(values, dataType, index)};
+    override def genCode(
+        ctx: CodegenContext,
+        ev: ExprCode,
+        mapType: MapType): ExprCode = {
+      val indexRef = ctx.addReferenceObj("mapLookupIndex", index, 
"java.util.HashMap")
+      val valuesRef = ctx.addReferenceObj("mapLookupValues", values, 
"ArrayData")
+      val idxBoxed = ctx.freshName("idxBoxed")
+      val idx = ctx.freshName("idx")
+
+      nullSafeCodeGen(ctx, ev, (_, eval2) => {
+        val assignValue = if (mapType.valueContainsNull) {
+          s"""
+             |if ($valuesRef.isNullAt($idx)) {
+             |  ${ev.isNull} = true;
+             |} else {
+             |  ${ev.value} = ${CodeGenerator.getValue(valuesRef, dataType, 
idx)};
+             |}
+           """.stripMargin
+        } else {
+          s"${ev.value} = ${CodeGenerator.getValue(valuesRef, dataType, idx)};"
         }
-      """
-    })
+        // HashMap.get(Object) autoboxes primitive keys to their wrapper type 
(Integer, Long,
+        // ...) whose equals/hashCode match the values stored during index 
build.
+        s"""
+           |Integer $idxBoxed = (Integer) $indexRef.get($eval2);

Review Comment:
   Good point - this comment was about the earlier `HashMap.get(Object)` + 
autobox codegen. I force-pushed a rewrite (v2) after your review that replaced 
that codegen body with the same `int[] buckets` / inline primitive hash shape 
SPARK-55959 used (just built once on the driver instead of per row), so there 
is no autoboxing in the hot loop anymore. Measured in a local comparison 
(posted below): foldable-map hash-lookup perf matches SPARK-55959 within noise. 
The previous line 630 (the autobox comment) has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to