voonhous commented on code in PR #18797:
URL: https://github.com/apache/hudi/pull/18797#discussion_r3301538481


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala:
##########
@@ -106,6 +115,31 @@ object HoodieVectorSearchTableValuedFunction {
     }
     kValue
   }
+
+  /** Parses a string argument that may be NULL (meaning "not specified"). */
+  private[logical] def parseOptionalString(
+      funcName: String, expr: Expression, argName: String): Option[String] = 
expr match {
+    case Literal(null, _) => None
+    case Literal(v, StringType) if v != null =>
+      val s = v.toString.trim
+      if (s.isEmpty) None else Some(s)
+    case _ => throw new HoodieAnalysisException(
+      s"Function '$funcName': argument '$argName' must be a string literal or 
NULL, got: ${expr.sql}")
+  }
+
+  /**
+   * Parses a numeric argument that may be NULL (meaning "not specified"). 
Accepts
+   * any [[NumericType]] literal (Int/Long/Float/Double/Decimal/Short/Byte) and
+   * widens to Double. String literals — even ones whose contents parse as a
+   * number — are rejected so the type contract surfaces at parse time.
+   */
+  private[logical] def parseOptionalDouble(
+      funcName: String, expr: Expression, argName: String): Option[Double] = 
expr match {
+    case Literal(null, _) => None
+    case Literal(v, _: NumericType) if v != null => Some(v.toString.toDouble)
+    case _ => throw new HoodieAnalysisException(
+      s"Function '$funcName': argument '$argName' must be a numeric literal or 
NULL, got: ${expr.sql}")
+  }

Review Comment:
   No tests for parseOptionalDouble non-Double numeric inputs, the helper 
claims to accept Int/Long/Decimal/Short/Byte and widen it.
   
   Let's add a single test passing e.g. 0 (an Int literal) or a CAST(0.3 AS 
DECIMAL) would protect that contract.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala:
##########
@@ -156,15 +156,17 @@ case class ResolveReferences(spark: SparkSession) extends 
Rule[LogicalPlan]
       val searchAlgorithm = 
HoodieVectorSearchPlanBuilder.resolveAlgorithm(a.algorithm)
       val corpusDf = resolveTableToDf(a.table, 
HoodieVectorSearchTableValuedFunction.FUNC_NAME)
       val queryVector = evaluateQueryVector(a.queryVectorExpr)
-      searchAlgorithm.buildSingleQueryPlan(spark, corpusDf, a.embeddingCol, 
queryVector, a.k, a.metric)
+      searchAlgorithm.buildSingleQueryPlan(
+        spark, corpusDf, a.embeddingCol, queryVector, a.k, a.metric, a.filter, 
a.maxDistance)
 
     case HoodieVectorSearchBatchTableValuedFunction(args) =>
       val a = HoodieVectorSearchBatchTableValuedFunction.parseArgs(args)
       val searchAlgorithm = 
HoodieVectorSearchPlanBuilder.resolveAlgorithm(a.algorithm)
       val corpusDf = resolveTableToDf(a.corpusTable, 
HoodieVectorSearchBatchTableValuedFunction.FUNC_NAME)
       val queryDf = resolveTableToDf(a.queryTable, 
HoodieVectorSearchBatchTableValuedFunction.FUNC_NAME)
       searchAlgorithm.buildBatchQueryPlan(
-        spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol, 
a.k, a.metric)
+        spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol,

Review Comment:
   IIUC `a.filter` affectssz corpus-only, not query-side
   
   In batch mode, the filter only narrows the corpus side, and not the query 
relation. 
   
   Let's call this out. If users assume "filter applies to both", they'll be 
confused. Add a Javadoc sentence in `buildBatchQueryPlan`: "Applied to the 
corpus only, use a prior projection on the query table to filter queries."



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -285,15 +327,20 @@ object BruteForceSearchAlgorithm extends 
VectorSearchAlgorithm {
       queryDf: DataFrame,
       queryEmbeddingCol: String,
       k: Int,
-      metric: DistanceMetric.Value): LogicalPlan = {
+      metric: DistanceMetric.Value,
+      filter: Option[String] = None,
+      maxDistance: Option[Double] = None): LogicalPlan = {

Review Comment:
   When an abstract method in a trait declares default parameter values, the 
concrete override in a subclass inherits those defaults automatically, we don't 
have to restate them.
   
   ```scala
   filter: Option[String],
   maxDistance: Option[Double]
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -232,25 +248,50 @@ object HoodieVectorSearchPlanBuilder {
  * and select top-K per query. The cross-join produces O(|corpus| * |queries|)
  * intermediate rows, so this is suitable for small-to-medium query sets
  * (tens to low hundreds of queries) against moderate corpora.
+ *
+ * <p>Both modes support an optional {@code filter} predicate (applied to the 
corpus before
+ * distance computation, and an optional * {@code maxDistance} threshold 
(results beyond this distance are excluded before top-K
+ * selection, reducing shuffle and sort volume).
  */
 object BruteForceSearchAlgorithm extends VectorSearchAlgorithm {
 
   import HoodieVectorSearchPlanBuilder._
 
   override val name: String = "brute_force"
 
+  /**
+   * Applies a user-supplied SQL predicate to the corpus DataFrame, wrapping
+   * [[ParseException]] (predicate syntax error) and [[AnalysisException]]
+   * (unknown column, type mismatch, etc.) in a [[HoodieAnalysisException]] 
that
+   * echoes the offending expression. Other exception types are allowed to
+   * propagate untouched so they aren't misreported as a filter problem.
+   */
+  private def applyFilter(df: DataFrame, filterExpr: String): DataFrame = {
+    try {
+      df.filter(filterExpr)
+    } catch {
+      case e @ (_: ParseException | _: AnalysisException) =>
+        throw new HoodieAnalysisException(
+          s"Invalid pre-filter expression '$filterExpr': ${e.getMessage}")
+    }
+  }
+
   override def buildSingleQueryPlan(
       spark: SparkSession,
       corpusDf: DataFrame,
       embeddingCol: String,
       queryVector: Array[Double],
       k: Int,
-      metric: DistanceMetric.Value): LogicalPlan = {
+      metric: DistanceMetric.Value,
+      filter: Option[String] = None,
+      maxDistance: Option[Double] = None): LogicalPlan = {

Review Comment:
   When an abstract method in a trait declares default parameter values, the 
concrete override in a subclass inherits those defaults automatically, we don't 
have to restate them.
   
   ```scala
   filter: Option[String],
   maxDistance: Option[Double]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to