gstvg commented on code in PR #21679:
URL: https://github.com/apache/datafusion/pull/21679#discussion_r3104942706
##########
datafusion/sql/src/expr/function.rs:
##########
@@ -363,6 +369,146 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}
+ if let Some(fm) = self.context_provider.get_higher_order_meta(&name) {
+ // plan non-lambda arguments first so we can get theirs datatype
and call
+ // HigherOrderUDF::lambda_parameters to then plan the lambda
arguments with
+ // resolved lambda variables
+ enum ExprOrLambda {
+ Expr(Expr),
+ Lambda(sqlparser::ast::LambdaFunction),
+ }
+
+ let partially_planned = args
+ .into_iter()
+ .map(|a| match a {
+ FunctionArg::Unnamed(FunctionArgExpr::Expr(SQLExpr::Lambda(
+ lambda,
+ ))) => {
+ if !all_unique(&lambda.params) {
+ return plan_err!(
+ "lambda parameters names must be unique, got
{}",
+ lambda.params
+ );
+ }
+
+ Ok(ExprOrLambda::Lambda(lambda))
+ }
+ _ => Ok(ExprOrLambda::Expr(self.sql_fn_arg_to_logical_expr(
+ a,
+ schema,
+ planner_context,
+ )?)),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let current_fields = partially_planned
+ .iter()
+ .map(|e| match e {
+ ExprOrLambda::Expr(expr) => {
+ Ok(ValueOrLambda::Value(expr.to_field(schema)?.1))
+ }
+ ExprOrLambda::Lambda(_lambda_function) => {
+ Ok(ValueOrLambda::Lambda(()))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let coerced_values =
+ value_fields_with_higher_order_udf(¤t_fields,
fm.as_ref())?
+ .into_iter()
+ .filter_map(|arg| match arg {
+ ValueOrLambda::Value(value) => Some(value),
+ ValueOrLambda::Lambda(_lambda) => None,
+ })
+ .collect::<Vec<_>>();
+
+ // lambda_parameters refers only to lambdas and not to values, so
instead
+ // of zipping it with partially_planned, we iterate over
partially_planned and only
+ // consume from lambda_parameters when a given argument is a lambda
+ // to reconstruct the arguments list with the correct order
+ // this supports any value and lambda positioning including
+ // multiple lambdas interleaved with values
+ let mut lambda_parameters =
+ fm.lambda_parameters(&coerced_values)?.into_iter();
+
+ let num_lambdas = partially_planned.len() - coerced_values.len();
+
+ // functions can support multiple lambdas where some trailing ones
are optional,
+ // but to simplify the implementor, lambda_parameters returns the
parameters of all of them,
+ // so we can't do equality check. one example is spark reduce:
+ // https://spark.apache.org/docs/latest/api/sql/index.html#reduce
+ if lambda_parameters.len() < num_lambdas {
+ return plan_err!(
+ "{} invocation defined {num_lambdas} but lambda_parameters
returned only {}",
+ fm.name(),
+ lambda_parameters.len()
+ );
+ }
+
+ let args = partially_planned
+ .into_iter()
+ .map(|arg| match arg {
+ ExprOrLambda::Expr(expr) => Ok(expr),
+ ExprOrLambda::Lambda(lambda) => {
+ let lambda_params =
+ lambda_parameters.next().ok_or_else(|| {
+ internal_datafusion_err!(
+ "lambda_parameters len should have been
checked above"
+ )
+ })?;
+
+ if lambda.params.len() > lambda_params.len() {
+ return plan_err!(
+ "lambda defined {} params but UDF support only
{}",
+ lambda.params.len(),
+ lambda_params.len()
+ );
+ }
+
+ let params =
+ lambda.params.iter().map(|p|
p.value.clone()).collect();
Review Comment:
https://github.com/apache/datafusion/pull/21679/changes/797d4beb42b1afe156ca13e0b81a11202b925329
thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]