Hi Timo and Jark,
I agree Oracle's syntax seems concise and more descriptive. For the
built-in `ML_PREDICT` and `ML_EVALUATE` functions I agree with Jark we can
support them as built-in PTF using `SqlTableFunction` for this FLIP. We can
have a different FLIP discussing user defined PTF and adopt that later for
model functions later. To summarize, the current proposed syntax is
SELECT f1, f2, label FROM TABLE(ML_PREDICT(TABLE `my_data`,
`classifier_model`, f1, f2))
SELECT * FROM TABLE(ML_EVALUATE(TABLE `eval_data`, `classifier_model`, f1,
f2))
Is `DESCRIPTOR` a must in the syntax? If so, it becomes
SELECT f1, f2, label FROM TABLE(ML_PREDICT(TABLE `my_data`,
`classifier_model`, DESCRIPTOR(f1), DESCRIPTOR(f2)))
SELECT * FROM TABLE(ML_EVALUATE(TABLE `eval_data`, `classifier_model`,
DESCRIPTOR(f1), DESCRIPTOR(f2)))
If Calcite supports dropping outer table keyword, it becomes
SELECT f1, f2, label FROM ML_PREDICT(TABLE `my_data`, `classifier_model`,
DESCRIPTOR(f1), DESCRIPTOR(f2))
SELECT * FROM ML_EVALUATE(TABLE `eval_data`, `classifier_model`,
DESCRIPTOR(
f1), DESCRIPTOR(f2))
Thanks,
Hao
On Fri, Mar 22, 2024 at 9:16 AM Jark Wu <imj...@gmail.com> wrote:
Sorry, I mean we can bump the Calcite version if needed in Flink 1.20.
On Fri, 22 Mar 2024 at 22:19, Jark Wu <imj...@gmail.com> wrote:
Hi Timo,
Introducing user-defined PTF is very useful in Flink, I'm +1 for this.
But I think the ML model FLIP is not blocked by this, because we
can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
just like TUMBLE/HOP. And support user-defined ML functions as
a future FLIP.
Regarding the simplified PTF syntax which reduces the outer TABLE()
keyword,
it seems it was just supported[1] by the Calcite community last month
and
will be
released in the next version (v1.37). The Calcite community is
preparing
the
1.37 release, so we can bump the version if needed in Flink 1.19.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/CALCITE-6254
On Fri, 22 Mar 2024 at 21:46, Timo Walther <twal...@apache.org> wrote:
Hi everyone,
this is a very important change to the Flink SQL syntax but we can't
wait until the SQL standard is ready for this. So I'm +1 on
introducing
the MODEL concept as a first class citizen in Flink.
For your information: Over the past months I have already spent a
significant amount of time thinking about how we can introduce PTFs in
Flink. I reserved FLIP-440[1] for this purpose and I will share a
version of this in the next 1-2 weeks.
For a good implementation of FLIP-440 and also FLIP-437, we should
evolve the PTF syntax in collaboration with Apache Calcite.
There are different syntax versions out there:
1) Flink
SELECT * FROM
TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10'
MINUTES));
2) SQL standard
SELECT * FROM
TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10'
MINUTES));
3) Oracle
SELECT * FROM
TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));
As you can see above, Flink does not follow the standard correctly as
it
would need to use `TABLE()` but this is not provided by Calcite yet.
I really like the Oracle syntax[2][3] a lot. It reduces necessary
keywords to a minimum. Personally, I would like to discuss this syntax
in a separate FLIP and hope I will find supporters for:
SELECT * FROM
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
If we go entirely with the Oracle syntax, as you can see in the
example,
Oracle allows for passing identifiers directly. This would solve our
problems for the MODEL as well:
SELECT f1, f2, label FROM ML_PREDICT(
data => `my_data`,
model => `classifier_model`,
input => DESCRIPTOR(f1, f2));
Or we completely adopt the Oracle syntax:
SELECT f1, f2, label FROM ML_PREDICT(
data => `my_data`,
model => `classifier_model`,
input => COLUMNS(f1, f2));
What do you think?
Happy to create a FLIP for just this syntax question and collaborate
with the Calcite community on this. Supporting the syntax of Oracle
shouldn't be too hard to convince at least as parser parameter.
Regards,
Timo
[1]
https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
[2]
https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072
[3]
https://oracle-base.com/articles/18c/polymorphic-table-functions-18c
On 20.03.24 17:22, Mingge Deng wrote:
Thanks Jark for all the insightful comments.
We have updated the proposal per our offline discussions:
1. Model will be treated as a new relation in FlinkSQL.
2. Include the common ML predict and evaluate functions into the
open
source flink to complete the user journey.
And we should be able to extend the calcite SqlTableFunction to
support
these two ML functions.
Best,
Mingge
On Mon, Mar 18, 2024 at 7:05 PM Jark Wu <imj...@gmail.com> wrote:
Hi Hao,
I meant how the table name
in window TVF gets translated to `SqlCallingBinding`. Probably we
need
to
fetch the table definition from the catalog somewhere. Do we treat
those
window TVF specially in parser/planner so that catalog is looked up
when
they are seen?
The table names are resolved and validated by Calcite SqlValidator.
We
don' need to fetch from catalog manually.
The specific checking logic of cumulate window happens in
SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
The return type of SqlCumulateTableFunction is defined in
#getRowTypeInference() method.
Both are public interfaces provided by Calcite and it seems it's
not
specially handled in parser/planner.
I didn't try that, but my gut feeling is that the framework is
ready
to
extend a customized TVF.
For what model is, I'm wondering if it has to be datatype or
relation.
Can
it be another kind of citizen parallel to
datatype/relation/function/db?
Redshift also supports `show models` operation, so it seems it's
treated
specially as well?
If it is an entity only used in catalog scope (e.g., show xxx,
create
xxx,
drop xxx), it is fine to introduce it.
We have introduced such one before, called Module: "load module",
"show
modules" [1].
But if we want to use Model in TVF parameters, it means it has to
be
a
relation or datatype, because
that is what it only accepts now.
Thanks for sharing the reason of preferring TVF instead of Redshift
way. It
sounds reasonable to me.
Best,
Jark
[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/
On Fri, 15 Mar 2024 at 13:41, Hao Li <h...@confluent.io.invalid>
wrote:
Hi Jark,
Thanks for the pointer. Sorry for the confusion: I meant how the
table
name
in window TVF gets translated to `SqlCallingBinding`. Probably we
need to
fetch the table definition from the catalog somewhere. Do we treat
those
window TVF specially in parser/planner so that catalog is looked
up
when
they are seen?
For what model is, I'm wondering if it has to be datatype or
relation.
Can
it be another kind of citizen parallel to
datatype/relation/function/db?
Redshift also supports `show models` operation, so it seems it's
treated
specially as well? The reasons I don't like Redshift's syntax are:
1. It's a bit verbose, you need to think of a model name as well
as
a
function name and the function name also needs to be unique.
2. More importantly, prediction function isn't the only function
that
can
operate on models. There could be a set of inference functions [1]
and
evaluation functions [2] which can operate on models. It's hard to
specify
all of them in model creation.
[1]:
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-predict
[2]:
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-evaluate
Thanks,
Hao
On Thu, Mar 14, 2024 at 8:18 PM Jark Wu <imj...@gmail.com> wrote:
Hi Hao,
Can you send me some pointers
where the function gets the table information?
Here is the code of cumulate window type checking [1].
Also is it possible to support <query_stmt> in
window functions in addiction to table?
Yes. It is not allowed in TVF.
Thanks for the syntax links of other systems. The reason I prefer
the
Redshift way is
that it avoids introducing Model as a relation or datatype
(referenced
as a
parameter in TVF).
Model is not a relation because it can be queried directly (e.g.,
SELECT
*
FROM model).
I'm also confused about making Model as a datatype, because I
don't
know
what class the
model parameter of the eval method of
TableFunction/ScalarFunction
should
be. By defining
the function with the model, users can directly invoke the
function
without
reference to the model name.
Best,
Jark
[1]:
https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java#L53
On Fri, 15 Mar 2024 at 02:48, Hao Li <h...@confluent.io.invalid>
wrote:
Hi Jark,
Thanks for the pointers. It's very helpful.
1. Looks like `tumble`, `hopping` are keywords in calcite
parser.
And
the
syntax `cumulate(Table my_table, ...)` needs to get table
information
from
catalog somewhere for type validation etc. Can you send me some
pointers
where the function gets the table information?
2. The ideal syntax for model function I think would be
`ML_PREDICT(MODEL
<model_name>, {table <table_name> | (query_stmt) })`. I think
with
special
handling of the `ML_PREDICT` function in parser/planner, maybe
we
can
do
this like window functions. But to support `MODEL` keyword, we
need
calcite
parser change I guess. Also is it possible to support
<query_stmt>
in
window functions in addiction to table?
For the redshift syntax, I'm not sure the purpose of defining
the
function
name with the model. Is it to define the function input/output
schema?
We
have the schema in our create model syntax and the `ML_PREDICT`
can
handle
it by getting model definition. I think our syntax is more
concise
to
have
a generic prediction function. I also did some research and it's
the
syntax
used by Databricks `ai_query` [1], Snowflake `predict` [2],
Azureml
`predict` [3].
[1]:
https://docs.databricks.com/en/sql/language-manual/functions/ai_query.html
[2]:
https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_inference.ipynb?_fsi=sksXUwQ0
[3]:
https://learn.microsoft.com/en-us/sql/machine-learning/tutorials/quickstart-python-train-score-model?view=azuresqldb-mi-current
Thanks,
Hao
On Wed, Mar 13, 2024 at 8:57 PM Jark Wu <imj...@gmail.com>
wrote:
Hi Mingge, Hao,
Thanks for your replies.
PTF is actually the ideal approach for model functions, and we
do
have
the plans to use PTF for
all model functions (including prediction, evaluation etc..)
once
the
PTF
is supported in FlinkSQL
confluent extension.
It sounds that PTF is the ideal way and table function is a
temporary
solution which will be dropped in the future.
I'm not sure whether we can implement it using PTF in Flink
SQL.
But
we
have implemented window
functions using PTF[1]. And introduced a new window function
(called
CUMULATE[2]) in Flink SQL based
on this. I think it might work to use PTF and implement model
function
syntax like this:
SELECT * FROM TABLE(ML_PREDICT(
TABLE my_table,
my_model,
col1,
col2
));
Besides, did you consider following the way of AWS Redshift
which
defines
model function with the model itself together?
IIUC, a model is a black-box which defines input parameters and
output
parameters which can be modeled into functions.
Best,
Jark
[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#session
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows
[3]:
https://github.com/aws-samples/amazon-redshift-ml-getting-started/blob/main/use-cases/bring-your-own-model-remote-inference/README.md#create-model
On Wed, 13 Mar 2024 at 15:00, Hao Li <h...@confluent.io.invalid
wrote:
Hi Jark,
Thanks for your questions. These are good questions!
1. The polymorphism table function I was referring to takes a
table
as
input and outputs a table. So the syntax would be like
```
SELECT * FROM ML_PREDICT('model', (SELECT * FROM my_table))
```
As far as I know, this is not supported yet on Flink. So
before
it's
supported, one option for the predict function is using table
function
which can output multiple columns
```
SELECT * FROM my_table, LATERAL VIEW (ML_PREDICT('model',
col1,
col2))
```
2. Good question. Type inference is hard for the `ML_PREDICT`
function
because it takes a model name string as input. I can think of
three
ways
of
doing type inference for it.
1). Treat `ML_PREDICT` function as something special and
during
sql
parsing or planning time, if it's encountered, we need to look
up
the
model
from the first argument which is a model name from catalog.
Then
we
can
infer the input/output for the function.
2). We can define a `model` keyword and use that in the
predict
function
to indicate the argument refers to a model. So it's like
`ML_PREDICT(model
'my_model', col1, col2))`
3). We can create a special type of table function maybe
called
`ModelFunction` which can resolve the model type inference by
special
handling it during parsing or planning time.
1) is hacky, 2) isn't supported in Flink for function, 3)
might
be
a
good option.
3. I sketched the `ML_PREDICT` function for inference. But
there
are
limitations of the function mentioned in 1 and 2. So maybe we
don't
need
to
introduce them as built-in functions until polymorphism table
function
and
we can properly deal with type inference.
After that, defining a user-defined model function should also
be
straightforward.
4. For model types, do you mean 'remote', 'import', 'native'
models
or
other things?
5. We could support popular providers such as 'azureml',
'vertexai',
'googleai' as long as we support the `ML_PREDICT` function.
Users
should
be
able to implement 3rd-party providers if they can implement a
function
handling the input/output for the provider.
I think for the model functions, there are still dependencies
or
hacks
we
need to sort out as a built-in function. Maybe we can separate
that
as
a
follow up if we want to have it built-in and focus on the
model
syntax
for
this FLIP?
Thanks,
Hao
On Tue, Mar 12, 2024 at 10:33 PM Jark Wu <imj...@gmail.com>
wrote:
Hi Minge, Chris, Hao,
Thanks for proposing this interesting idea. I think this is a
nice
step
towards
the AI world for Apache Flink. I don't know much about AI/ML,
so
I
may
have
some stupid questions.
1. Could you tell more about why polymorphism table function
(PTF)
doesn't
work and do we have plan to use PTF as model functions?
2. What kind of object does the model map to in SQL? A
relation
or
a
data
type?
It looks like a data type because we use it as a parameter of
the
table
function.
If it is a data type, how does it cooperate with type
inference[1]?
3. What built-in model functions will we support? How to
define a
user-defined model function?
4. What built-in model types will we support? How to define a
user-defined
model type?
5. Regarding the remote model, what providers will we
support?
Can
users
implement
3rd-party providers except OpenAI?
Best,
Jark
[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
On Wed, 13 Mar 2024 at 05:55, Hao Li
<h...@confluent.io.invalid
wrote:
Hi, Dev
Mingge, Chris and I would like to start a discussion about
FLIP-437:
Support ML Models in Flink SQL.
This FLIP is proposing to support machine learning models in
Flink
SQL
syntax so that users can CRUD models with Flink SQL and use
models
on
Flink
to do prediction with Flink data. The FLIP also proposes new
model
entities
and changes to catalog interface to support model CRUD
operations
in
catalog.
For more details, see FLIP-437 [1]. Looking forward to your
feedback.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
Thanks,
Minge, Chris & Hao