+1

Personally, I would declare it as `collect(): ClosableIterator<Row>` to avoid an additional class in the API and reuse existing Flink utils.

Regards,
Timo

On 17.05.20 10:21, Jingsong Li wrote:
+1

Best,
Jingsong Lee

On Sun, May 17, 2020 at 3:42 PM Kurt Young <ykt...@gmail.com> wrote:

+1 from my side.

Best,
Kurt


On Sun, May 17, 2020 at 12:41 PM godfrey he <godfre...@gmail.com> wrote:

hi everyone,

I would like to bring up another topic about the return value of
TableResult#collect() method.
Currently, the return type is `Iterator<Row>`, we meet some problems when
implementing FLINK-14807[1].

In current design, the sink operator has a buffer pool which buffers the
data from upstream,
and waits the client to consume the data. The client will pull the data
when `Iterator<Row>#next()` method is called.
If the client submits a select job, consumes a part of data and exits.
The
job will not be finished.
This will cause resource leak. We can't require the client must consume
all
data. for unbounded stream job, it's also impossible.
Currently, users can also cancel the job via
`TableResult.getJobClient().get().cancel()` method.
But this approach is not intuitive and convenient.

So, I want to change the return type from `Iterator<Row>` to
`CloseableRowIterator`,
the new method likes like:

public interface TableResult {

   CloseableRowIterator collect();

}

public interface CloseableRowIterator extends Iterator<Row>,
AutoCloseable
{

}

Prefixing the name with "Closeable" is intended to remind the users that
this iterator should be closed,
users can conveniently use try-with-resources statement to close the
resources.
The resource leak problem is still there if users do not close the
iterator
or cancel the job through job client,
  we just provide an easier way for users to avoid this.

I also notice that there is a `CloseableIterator` interface in
`org.apache.flink.util` package.
But I still tend to introduce `CloseableRowIterator`. My point of view
is:
1) `CloseableIterator` is in a util package, not a public interface.
2) `CloseableRowIterator` is more convenient, users do not need to define
generic type `<Row>`.

What do you think?

Best,
Godfrey


[1] https://issues.apache.org/jira/browse/FLINK-14807


Fabian Hueske <fhue...@gmail.com> 于2020年5月7日周四 下午3:59写道:

Thanks for the update Godfrey!

+1 to this approach.

Since there can be only one primary key, I'd also be fine to just use
`PRI` even if it is composite, but `PRI(f0, f5)` might be more
convenient
for users.

Thanks, Fabian

Am Do., 7. Mai 2020 um 09:31 Uhr schrieb godfrey he <
godfre...@gmail.com
:

Hi fabian,
Thanks for you suggestions.

Agree with you that `UNQ(f2, f3)` is more clear.

A table can have only ONE primary key,
this primary key can consist of single or multiple columns. [1]
if primary key consists of single column,
we can simply use `PRI` (or `PRI(xx)`) to represent it.
if primary key have multiple columns,
we should use `PRI(xx, yy, ...)` to represent it.

A table may have multiple unique keys,
each unique key can consist of single or multiple columns. [2]
if there is only one unique key and this unique key has only single
column,
we can simply use `UNQ` (or `UNQ(xx)`) to represent it.
otherwise, we should use `UNQ(xx, yy, ...)` to represent it.
(a corner case: two unique keys with same columns, like `UNQ(f2, f3)`,
`UNQ(f2, f3)`,
we can forbid this case or add a unique name for each key in the
future)

primary key and unique key with multiple columns example:
create table MyTable (
   f0 BIGINT NOT NULL,
   f1 ROW<q1 STRING, q2 TIMESTAMP(3)>,
   f2 VARCHAR<256>,
   f3 AS f0 + 1,
   f4 TIMESTAMP(3) NOT NULL,
   f5 BIGINT NOT NULL,
  * PRIMARY KEY (f0, f5)*,
   *UNIQUE (f3, f2)*,
   WATERMARK f4 AS f4 - INTERVAL '3' SECOND
) with (...)




+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| name | type
   |
null   | key              | compute column | watermark
        |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f0       | BIGINT
  |
false | PRI(f0, f5)   |  (NULL)               |   (NULL)
             |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f1       | ROW<q1 STRING, q2 TIMESTAMP(3)> | true   | (NULL)
|
(NULL)               |  (NULL)                                 |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f2       | VARCHAR<256>                                     | true
  |
UNQ(f2, f3) |  (NULL)               |  (NULL)
    |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f3       | BIGINT
  |
false | UNQ(f2, f3) |  f0 + 1                  |  (NULL)
              |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f4       | TIMESTAMP(3)                                        |
false
| (NULL)        |  (NULL)               | f4 - INTERVAL '3' SECOND |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+
| f5       | BIGINT
  |
false | PRI(f0, f5)   |  (NULL)               |   (NULL)
             |



+--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+

"Regarding to the watermark on nested columns", that's a good approach
which can both support watermark on nested columns in the future and
keep
current table form.

[1] https://www.w3schools.com/sql/sql_primarykey.asp
[2] https://www.w3schools.com/sql/sql_unique.ASP

Best,
Godfrey

Fabian Hueske <fhue...@gmail.com> 于2020年5月7日周四 上午12:03写道:

Hi Godfrey,

This looks good to me.

One side note, indicating unique constraints with "UNQ" is probably
not
enough.
There might be multiple unique constraints and users would like to
know
which field combinations are unique.
So in your example above, "UNQ(f2, f3)" might be a better marker.

Just as a thought, if we would later add support for watermark on
nested
columns, we could add a row just for the nested field (in addition to
the
top-level field) like this:




+------------------------+---------------------------+-------+-----------+-------------+-----------------------------------------------------------+
| f4.nested.rowtime | TIMESTAMP(3)        | false | (NULL) |  (NULL)
| f4.nested.rowtime - INTERVAL '3' SECOND |



+------------------------+---------------------------+-------+-----------+-------------+-----------------------------------------------------------+

Thanks,
Fabian

Am Mi., 6. Mai 2020 um 17:51 Uhr schrieb godfrey he <
godfre...@gmail.com
:

Hi @fhue...@gmail.com @Timo Walther <twal...@apache.org>  @Dawid
Wysakowicz <dwysakow...@apache.org>
What do you think we limit watermark must be defined on top-level
column ?

if we do that, we can add an expression column to represent
watermark
like compute column,
An example of all cases:
create table MyTable (
   f0 BIGINT NOT NULL,
   f1 ROW<q1 STRING, q2 TIMESTAMP(3)>,
   f2 VARCHAR<256>,
   f3 AS f0 + 1,
   f4 TIMESTAMP(3) NOT NULL,
   PRIMARY KEY (f0),
   UNIQUE (f3, f2),
   WATERMARK f4 AS f4 - INTERVAL '3' SECOND
) with (...)




+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| name | type
| null   | key        | compute column | watermark
    |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| f0       | BIGINT
  | false | PRI       |  (NULL)               |   (NULL)
             |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| f1       | ROW<q1 STRING, q2 TIMESTAMP(3)> | true   | (NULL) |
(NULL)               |  (NULL)                                 |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| f2       | VARCHAR<256>                                     | true
  | UNQ     |  (NULL)               |  (NULL)
  |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| f3       | BIGINT
  | false | UNQ     |  f0 + 1                  |  (NULL)
              |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+
| f4       | TIMESTAMP(3)                                        |
false | (NULL) |  (NULL)                | f4 - INTERVAL '3' SECOND |



+--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+

WDYT ?

Best,
Godfrey



godfrey he <godfre...@gmail.com> 于2020年4月30日周四 下午11:57写道:

Hi Fabian,

the broken example is:

create table MyTable (

     f0 BIGINT NOT NULL,

     f1 ROW<q1 STRING, q2 TIMESTAMP(3)>,

     f2 VARCHAR<256>,

     f3 AS f0 + 1,

     PRIMARY KEY (f0),

UNIQUE (f3, f2),

     WATERMARK f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

) with (...)


name

type

key

compute column

watermark

f0

BIGINT NOT NULL

PRI

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

UNQ

(NULL)

f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

f2

VARCHAR<256>

(NULL)

NULL

f3

BIGINT NOT NULL

UNQ

f0 + 1


or we add a column to represent nullability.

name

type

null

key

compute column

watermark

f0

BIGINT

false

PRI

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

true

UNQ

(NULL)

f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

f2

VARCHAR<256>

true

(NULL)

NULL

f3

BIGINT

false

UNQ

f0 + 1




Hi Jark,
If we can limit watermark must be defined on top-level column,
this will become more simple.

Best,
Godfrey

Jark Wu <imj...@gmail.com> 于2020年4月30日周四 下午11:38写道:

Hi,

I'm in favor of Fabian's proposal.
First, watermark is not a column, but a metadata just like primary
key, so
shouldn't stand with columns.
Second, AFAIK, primary key can only be defined on top-level
columns.
Third, I think watermark can also follow primary key than only
allow
to
define on top-level columns.

I have to admit that in FLIP-66, watermark can define on nested
fields.
However, during implementation, I found that it's too complicated
to
do
that. We have refactor time-based physical nodes,
we have to use code generation to access event-time, we have to
refactor
FlinkTypeFactory to support a complex nested rowtime.
There is not much value of this feature, but introduce a lot of
complexity
in code base.
So I think we can force watermark define on top-level columns. If
user want
to define on nested columns,
he/she can use computed column to be a top-level column.

Best,
Jark


On Thu, 30 Apr 2020 at 17:55, Fabian Hueske <fhue...@gmail.com>
wrote:

Hi Godfrey,

The formatting of your example seems to be broken.
Could you send them again please?

Regarding your points
because watermark express can be a sub-column, just like
`f1.q2`
in above
example I give.

I would put the watermark information in the row of the
top-level
field and
indicate to which nested field the watermark refers.
Don't we have to solve the same issue for primary keys that are
defined on
a nested field?

A boolean flag can't represent such info. and I do know
whether
we will
support complex watermark expression involving multiple columns
in
the
future. such as: "WATERMARK FOR ts as ts + f1 + interval '1'
second"

You are right, a simple binary flag is definitely not sufficient
to
display
the watermark information.
I would put the expression string into the field, i.e., "ts +
f1 +
interval
'1' second"


For me the most important point of why to not show the watermark
as
a row
in the table is that it is not field that can be queried but
meta
information on an existing field.
For the user it is important to know that a certain field has a
watermark.
Otherwise, certain queries cannot be correctly specified.
Also there might be support for multiple watermarks that are
defined of
different fields at some point. Would those be printed in
multiple
rows?

Best,
Fabian


Am Do., 30. Apr. 2020 um 11:25 Uhr schrieb godfrey he <
godfre...@gmail.com
:

Hi Fabian, Aljoscha

Thanks for the feedback.

Agree with you that we can deal with primary key as you
mentioned.
now, the type column has contained the nullability attribute,
e.g. BIGINT
NOT NULL.
(I'm also ok that we use two columns to represent type just
like
mysql)

Why I treat `watermark` as a special row ?
because watermark express can be a sub-column, just like
`f1.q2`
in above
example I give.
A boolean flag can't represent such info. and I do know
whether
we will
support complex
watermark expression involving multiple columns in the future.
such as:
"WATERMARK FOR ts as ts + f1 + interval '1' second"

If we do not support complex watermark expression, we can add
a
watermark
column.

for example:

create table MyTable (

     f0 BIGINT NOT NULL,

     f1 ROW<q1 STRING, q2 TIMESTAMP(3)>,

     f2 VARCHAR<256>,

     f3 AS f0 + 1,

     PRIMARY KEY (f0),

UNIQUE (f3, f2),

     WATERMARK f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

) with (...)


name

type

key

compute column

watermark

f0

BIGINT NOT NULL

PRI

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

UNQ

(NULL)

f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

f2

VARCHAR<256>

(NULL)

NULL

f3

BIGINT NOT NULL

UNQ

f0 + 1


or we add a column to represent nullability.

name

type

null

key

compute column

watermark

f0

BIGINT

false

PRI

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

true

UNQ

(NULL)

f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND)

f2

VARCHAR<256>

true

(NULL)

NULL

f3

BIGINT

false

UNQ

f0 + 1


Personally, I like the second one. (we need do some changes on
LogicalType
to get type name without nullability)


Best,
Godfrey


Aljoscha Krettek <aljos...@apache.org> 于2020年4月29日周三
下午5:47写道:

+1 I like the general idea of printing the results as a
table.

On the specifics I don't know enough but Fabians suggestions
seems to
make sense to me.

Aljoscha

On 29.04.20 10:56, Fabian Hueske wrote:
Hi Godfrey,

Thanks for starting this discussion!

In my mind, WATERMARK is a property (or constraint) of a
field, just
like
PRIMARY KEY.
Take this example from MySQL:

mysql> CREATE TABLE people (id INT NOT NULL, name
VARCHAR(128) NOT
NULL,
age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.06 sec)

mysql> describe people;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id    | int          | NO   | PRI | NULL    |       |
| name  | varchar(128) | NO   |     | NULL    |       |
| age   | int          | YES  |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)

Here, PRIMARY KEY is marked in the Key column of the id
field.
We could do the same for watermarks by adding a Watermark
column.

Best, Fabian


Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he <
godfre...@gmail.com>:

Hi everyone,

I would like to bring up a discussion about the result
type
of
describe
statement,
which is introduced in FLIP-84[1].
In previous version, we define the result type of
`describe`
statement
is a
single column as following

Statement

Result Schema

Result Value

Result Kind

Examples

DESCRIBE xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

describe the detail of an object

(single row)

SUCCESS_WITH_CONTENT

DESCRIBE table_name

for "describe table_name", the result value is the
`toString` value
of
`TableSchema`, which is an unstructured data.
It's hard to for user to use this info.

for example:

TableSchema schema = TableSchema.builder()
     .field("f0", DataTypes.BIGINT())
     .field("f1", DataTypes.ROW(
        DataTypes.FIELD("q1", DataTypes.STRING()),
        DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3))))
     .field("f2", DataTypes.STRING())
     .field("f3", DataTypes.BIGINT(), "f0 + 1")
     .watermark("f1.q2", WATERMARK_EXPRESSION,
WATERMARK_DATATYPE)
     .build();

its `toString` value is:
root
   |-- f0: BIGINT
   |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
   |-- f2: STRING
   |-- f3: BIGINT AS f0 + 1
   |-- WATERMARK FOR f1.q2 AS now()

For hive, MySQL, etc., the describe result is table form
including
field
names and field types.
which is more familiar with users.
TableSchema[2] has watermark expression and compute
column,
we
should
also
put them into the table:
for compute column, it's a column level, we add a new
column
named
`expr`.
   for watermark expression, it's a table level, we add a
special row
named
`WATERMARK` to represent it.

The result will look like about above example:

name

type

expr

f0

BIGINT

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

(NULL)

f2

STRING

NULL

f3

BIGINT

f0 + 1

WATERMARK

(NULL)

f1.q2 AS now()

now there is a pr FLINK-17112 [3] to implement DESCRIBE
statement.

What do you think about this update?
Any feedback are welcome~

Best,
Godfrey

[1]






https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]







https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
[3] https://github.com/apache/flink/pull/11892


godfrey he <godfre...@gmail.com> 于2020年4月6日周一 下午10:38写道:

Hi Timo,

Sorry for the late reply, and thanks for your
correction.
I missed DQL for job submission scenario.
I'll fix the document right away.

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2020年4月3日周五
下午9:53写道:

Hi Godfrey,

I'm sorry to jump in again but I still need to clarify
some things
around TableResult.

The FLIP says:
"For DML, this method returns TableResult until the job
is
submitted.
For other statements, TableResult is returned until the
execution
is
finished."

I thought we agreed on making every execution async?
This
also
means
returning a TableResult for DQLs even though the
execution
is not
done
yet. People need access to the JobClient also for batch
jobs in
order
to
cancel long lasting queries. If people want to wait for
the
completion
they can hook into JobClient or collect().

Can we rephrase this part to:

The FLIP says:
"For DML and DQL, this method returns TableResult once
the
job has
been
submitted. For DDL and DCL statements, TableResult is
returned
once
the
operation has finished."

Regards,
Timo


On 02.04.20 05:27, godfrey he wrote:
Hi Aljoscha, Dawid, Timo,

Thanks so much for the detailed explanation.
Agree with you that the multiline story is not
completed
now, and
we
can
keep discussion.
I will add current discussions and conclusions to the
FLIP.

Best,
Godfrey



Timo Walther <twal...@apache.org> 于2020年4月1日周三
下午11:27写道:

Hi Godfrey,

first of all, I agree with Dawid. The multiline story
is
not
completed
by this FLIP. It just verifies the big picture.

1. "control the execution logic through the proposed
method if
they
know
what the statements are"

This is a good point that also Fabian raised in the
linked
google
doc.
I
could also imagine to return a more complicated POJO
when
calling
`executeMultiSql()`.

The POJO would include some `getSqlProperties()` such
that a
platform
gets insights into the query before executing. We
could
also
trigger
the
execution more explicitly instead of hiding it behind
an
iterator.

2. "there are some special commands introduced in SQL
client"

For platforms and SQL Client specific commands, we
could
offer a
hook
to
the parser or a fallback parser in case the regular
table
environment
parser cannot deal with the statement.

However, all of that is future work and can be
discussed
in a
separate
FLIP.

3. +1 for the `Iterator` instead of `Iterable`.

4. "we should convert the checked exception to
unchecked
exception"

Yes, I meant using a runtime exception instead of a
checked
exception.
There was no consensus on putting the exception into
the
`TableResult`.

Regards,
Timo

On 01.04.20 15:35, Dawid Wysakowicz wrote:
When considering the multi-line support I think it
is
helpful
to
start
with a use case in mind. In my opinion consumers of
this method
will
be:

    1. sql-client
    2. third-part sql based platforms

@Godfrey As for the quit/source/... commands. I
think
those
belong
to
the responsibility of aforementioned. I think they
should not
be
understandable by the TableEnvironment. What would
quit
on a
TableEnvironment do? Moreover I think such commands
should be
prefixed
appropriately. I think it's a common practice to
e.g.
prefix
those
with
! or : to say they are meta commands of the tool
rather
than a
query.

I also don't necessarily understand why platform
users
need to
know
the
kind of the query to use the proposed method. They
should get
the
type
from the TableResult#ResultKind. If the ResultKind
is
SUCCESS,
it
was
a
DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL.
If
that's
not
enough
we can enrich the TableResult with more explicit
kind
of query,
but
so
far I don't see such a need.

@Kurt In those cases I would assume the developers
want
to
present
results of the queries anyway. Moreover I think it
is
safe to
assume
they can adhere to such a contract that the results
must be
iterated.

For direct users of TableEnvironment/Table API this
method does
not
make
much sense anyway, in my opinion. I think we can
rather
safely
assume
in
this scenario they do not want to submit multiple
queries at a
single
time.

Best,

Dawid


On 01/04/2020 15:07, Kurt Young wrote:
One comment to `executeMultilineSql`, I'm afraid
sometimes
user
might
forget to
iterate the returned iterators, e.g. user submits a
bunch of
DDLs
and
expect the
framework will execute them one by one. But it
didn't.

Best,
Kurt


On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek<
aljos...@apache.org>
wrote:

Agreed to what Dawid and Timo said.

To answer your question about multi line SQL: no,
we
don't
think
we
need
this in Flink 1.11, we only wanted to make sure
that
the
interfaces
that
we now put in place will potentially allow this in
the
future.

Best,
Aljoscha

On 01.04.20 09:31, godfrey he wrote:
Hi, Timo & Dawid,

Thanks so much for the effort of `multiline
statements
supporting`,
I have a few questions about this method:

1. users can well control the execution logic
through the
proposed
method
      if they know what the statements are (a
statement is a
DDL, a
DML
or
others).
but if a statement is from a file, that means
users
do not
know
what
the
statements are,
the execution behavior is unclear.
As a platform user, I think this method is hard
to
use,
unless
the
platform
defines
a set of rule about the statements order, such
as:
no select
in
the
middle,
dml must be at tail of sql file (which may be the
most case
in
product
env).
Otherwise the platform must parse the sql first,
then know
what
the
statements are.
If do like that, the platform can handle all
cases
through
`executeSql`
and
`StatementSet`.

2. SQL client can't also use
`executeMultilineSql`
to
supports
multiline
statements,
      because there are some special commands
introduced in
SQL
client,
such as `quit`, `source`, `load jar` (not exist
now,
but
maybe
we
need
this
command
      to support dynamic table source and udf).
Does TableEnvironment also supports those
commands?

3. btw, we must have this feature in
release-1.11?
I
find
there
are
few
user cases
      in the feedback document which behavior is
unclear now.

regarding to "change the return value from
`Iterable<Row` to
`Iterator<Row`",
I couldn't agree more with this change. Just as
Dawid
mentioned
"The contract of the Iterable#iterator is that it
returns a
new
iterator
each time,
      which effectively means we can iterate the
results
multiple
times.",
we does not provide iterate the results multiple
times.
If we want do that, the client must buffer all
results. but
it's
impossible
for streaming job.

Best,
Godfrey

Dawid Wysakowicz<dwysakow...@apache.org>
于2020年4月1日周三
上午3:14写道:

Thank you Timo for the great summary! It covers
(almost)
all
the
topics.
Even though in the end we are not suggesting
much
changes
to
the
current
state of FLIP I think it is important to lay out
all
possible
use
cases
so that we do not change the execution model
every
release.

There is one additional thing we discussed.
Could
we change
the
result
type of TableResult#collect to Iterator<Row>?
Even
though
those
interfaces do not differ much. I think Iterator
better
describes
that
the results might not be materialized on the
client
side,
but
can
be
retrieved on a per record basis. The contract of
the
Iterable#iterator
is that it returns a new iterator each time,
which
effectively
means
we
can iterate the results multiple times.
Iterating
the
results
is
not
possible when we don't retrieve all the results
from the
cluster
at
once.
I think we should also use Iterator for
TableEnvironment#executeMultilineSql(String
statements):
Iterator<TableResult>.

Best,

Dawid

On 31/03/2020 19:27, Timo Walther wrote:
Hi Godfrey,

Aljoscha, Dawid, Klou, and I had another
discussion around
FLIP-84.
In
particular, we discussed how the current status
of
the
FLIP
and
the
future requirements around multiline
statements,
async/sync,
collect()
fit together.

We also updated the FLIP-84 Feedback Summary
document [1]
with
some
use cases.

We believe that we found a good solution that
also
fits to
what
is
in
the current FLIP. So no bigger changes
necessary,
which is
great!

Our findings were:

1. Async vs sync submission of Flink jobs:

Having a blocking `execute()` in DataStream API
was
rather a
mistake.
Instead all submissions should be async because
this
allows
supporting
both modes if necessary. Thus, submitting all
queries
async
sounds
good to us. If users want to run a job sync,
they
can use
the
JobClient and wait for completion (or collect()
in
case of
batch
jobs).

2. Multi-statement execution:

For the multi-statement execution, we don't
see a
contradication
with
the async execution behavior. We imagine a
method
like:

TableEnvironment#executeMultilineSql(String
statements):
Iterable<TableResult>

Where the `Iterator#next()` method would
trigger
the next
statement
submission. This allows a caller to decide
synchronously
when
to
submit statements async to the cluster. Thus, a
service
such
as
the
SQL Client can handle the result of each
statement
individually
and
process statement by statement sequentially.

3. The role of TableResult and result retrieval
in
general

`TableResult` is similar to `JobClient`.
Instead
of
returning
a
`CompletableFuture` of something, it is a
concrete
util
class
where
some methods have the behavior of completable
future (e.g.
collect(),
print()) and some are already completed
(getTableSchema(),
getResultKind()).

`StatementSet#execute()` returns a single
`TableResult`
because
the
order is undefined in a set and all statements
have the
same
schema.
Its `collect()` will return a row for each
executed
`INSERT
INTO` in
the order of statement definition.

For simple `SELECT * FROM ...`, the query
execution might
block
until
`collect()` is called to pull buffered rows
from
the job
(from
socket/REST API what ever we will use in the
future). We
can
say
that
a statement finished successfully, when the
`collect#Iterator#hasNext`
has returned false.

I hope this summarizes our discussion
@Dawid/Aljoscha/Klou?

It would be great if we can add these findings
to
the FLIP
before we
start voting.

One minor thing: some `execute()` methods still
throw a
checked
exception; can we remove that from the FLIP?
Also
the
above
mentioned
`Iterator#next()` would trigger an execution
without
throwing
a
checked exception.

Thanks,
Timo

[1]










https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
On 31.03.20 06:28, godfrey he wrote:
Hi, Timo & Jark

Thanks for your explanation.
Agree with you that async execution should
always
be
async,
and sync execution scenario can be covered  by
async
execution.
It helps provide an unified entry point for
batch
and
streaming.
I think we can also use sync execution for
some
testing.
So, I agree with you that we provide
`executeSql`
method
and
it's
async
method.
If we want sync method in the future, we can
add
method
named
`executeSqlSync`.

I think we've reached an agreement. I will
update
the
document,
and
start
voting process.

Best,
Godfrey


Jark Wu<imj...@gmail.com>  于2020年3月31日周二
上午12:46写道:

Hi,

I didn't follow the full discussion.
But I share the same concern with Timo that
streaming
queries
should
always
be async.
Otherwise, I can image it will cause a lot of
confusion
and
problems
if
users don't deeply keep the "sync" in mind
(e.g.
client
hangs).
Besides, the streaming mode is still the
majority use
cases
of
Flink
and
Flink SQL. We should put the usability at a
high
priority.

Best,
Jark


On Mon, 30 Mar 2020 at 23:27, Timo Walther<
twal...@apache.org>
wrote:
Hi Godfrey,

maybe I wasn't expressing my biggest concern
enough in
my
last
mail.
Even in a singleline and sync execution, I
think that
streaming
queries
should not block the execution. Otherwise it
is
not
possible
to
call
collect() or print() on them afterwards.

"there are too many things need to discuss
for
multiline":

True, I don't want to solve all of them
right
now. But
what
I
know
is
that our newly introduced methods should fit
into a
multiline
execution.
There is no big difference of calling
`executeSql(A),
executeSql(B)` and
processing a multiline file `A;\nB;`.

I think the example that you mentioned can
simply be
undefined
for
now.
Currently, no catalog is modifying data but
just
metadata.
This
is a
separate discussion.

"result of the second statement is
indeterministic":

Sure this is indeterministic. But this is
the
implementers
fault
and we
cannot forbid such pipelines.

How about we always execute streaming
queries
async? It
would
unblock
executeSql() and multiline statements.

Having a `executeSqlAsync()` is useful for
batch.
However,
I
don't
want
`sync/async` be the new batch/stream flag.
The
execution
behavior
should
come from the query itself.

Regards,
Timo


On 30.03.20 11:12, godfrey he wrote:
Hi Timo,

Agree with you that streaming queries is
our
top
priority,
but I think there are too many things need
to
discuss
for
multiline
statements:
e.g.
1. what's the behaivor of DDL and DML
mixing
for async
execution:
create table t1 xxx;
create table t2 xxx;
insert into t2 select * from t1 where xxx;
drop table t1; // t1 may be a MySQL table,
the
data
will
also be
deleted.
t1 is dropped when "insert" job is running.

2. what's the behaivor of unified scenario
for
async
execution:
(as you
mentioned)
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT
STREAM;

The result of the second statement is
indeterministic,
because
the
first
statement maybe is running.
I think we need to put a lot of effort to
define the
behavior of
logically
related queries.

In this FLIP, I suggest we only handle
single
statement,
and
we
also
introduce an async execute method
which is more important and more often used
for users.

Dor the sync methods (like
`TableEnvironment.executeSql`
and
`StatementSet.execute`),
the result will be returned until the job
is
finished.
The
following
methods will be introduced in this FLIP:

        /**
         * Asynchronously execute the given
single
statement
         */
TableEnvironment.executeSqlAsync(String
statement):
TableResult

/**
        * Asynchronously execute the dml
statements as
a
batch
        */
StatementSet.executeAsync(): TableResult

public interface TableResult {
          /**
           * return JobClient for DQL and
DML
in async
mode,
else
return
Optional.empty
           */
          Optional<JobClient>
getJobClient();
}

what do you think?

Best,
Godfrey

Timo Walther<twal...@apache.org>
于2020年3月26日周四
下午9:15写道:

Hi Godfrey,

executing streaming queries must be our
top
priority
because
this
is
what distinguishes Flink from competitors.
If
we
change
the
execution
behavior, we should think about the other
cases as
well
to
not
break
the
API a third time.

I fear that just having an async execute
method will
not
be
enough
because users should be able to mix
streaming
and
batch
queries
in a
unified scenario.

If I remember it correctly, we had some
discussions
in
the
past
about
what decides about the execution mode of a
query.
Currently, we
would
like to let the query decide, not derive
it
from the
sources.

So I could image a multiline pipeline as:

USE CATALOG 'mycat';
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1
EMIT
STREAM;

For executeMultilineSql():

sync because regular SQL
sync because regular Batch SQL
async because Streaming SQL

For executeAsyncMultilineSql():

async because everything should be async
async because everything should be async
async because everything should be async

What we should not start for
executeAsyncMultilineSql():

sync because DDL
async because everything should be async
async because everything should be async

What are you thoughts here?

Regards,
Timo


On 26.03.20 12:50, godfrey he wrote:
Hi Timo,

I agree with you that streaming queries
mostly need
async
execution.
In fact, our original plan is only
introducing sync
methods in
this
FLIP,
and async methods (like
"executeSqlAsync")
will be
introduced
in
the
future
which is mentioned in the appendix.

Maybe the async methods also need to be
considered
in
this
FLIP.

I think sync methods is also useful for
streaming
which







Reply via email to