Hi Dmytro,
aggregate functions will support the new type system in Flink 1.12.
Until then, they cannot be used with the new `call()` syntax as
anonymous functions. In order to use the old type system, you need to
register the function explicilty using SQL `CREATE FUNCTION a AS
'myFunc'` and then use them in `call("myFunc", ...)`.
The mentioned "No match found for function signature fun(<NUMERIC>)" was
a bug that got fixed in 1.11.1:
https://issues.apache.org/jira/browse/FLINK-18520
This bug only exists for catalog functions, not temporary system functions.
Regards,
Timo
On 27.07.20 16:35, Dmytro Dragan wrote:
Hi All,
I see strange behavior of UDAF functions:
Let`s say we have a simple table:
EnvironmentSettings settings =
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
TableEnvironment t = TableEnvironment./create/(settings);
Table table = t.fromValues(DataTypes./ROW/(
DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()),
DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull())
),
/row/(1.0, "S"), /row/(2.0, "S"));
t.createTemporaryView("A", table);
As example we will use build-in function with a new name:
t.createTemporaryFunction("max_value", new
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());
Using Table API we can write:
t.createTemporaryView("B", table
.groupBy(/$/("symbol"))
.select(/$/("symbol"),/call/("max_value", /$/("price")))
);
and get:
org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
Using SQL API we can write:
t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A
group by symbol"));
and get:
org.apache.flink.table.api.ValidationException: SQL validation failed.
From line 1, column 8 to line 1, column 23: No match found for function
signature max_value(<NUMERIC>)
Calling build-in max function instead of provided alias will produce
correct results.
In addition, non-retract implementation of max function
(MaxAggFunction.DoubleMaxAggFunction) would produce:
org.apache.flink.table.api.ValidationException: Could not register
temporary catalog function 'default_catalog.default_database.max_value'
due to implementation errors.
Cause DoubleMaxAggFunction is not serializable.
Am I missing something?