Hi Jark,

thanks for the detailed review. Let me answer your concerns:

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of a QueryOperation tree in the validation phase." I'm fine with allowing `system_proctime` everywhere in the query. Also for SQL, I think we should have done that earlier already to give users the chance to have time based operations also at later stages.

2. "By using `system_rowtime().as("rowtime")` the watermark would be assigned implicitly. " Yes, we just use the DataStream API watermark. `system_rowtime()` will just introduce a time attribute, the watermark travels to the Table API and into DataStream API without further code changes.

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): DataStream<Row>"
4. "Table.execute(ChangelogMode)"
Filtering UPDATE_BEFORE is already quite important as it reduces the amount of data by factor 2. But I also understand your concerns regarding confusing users. I also got the request for a `Table.getChangelogMode()` a couple of times in the past, because users would like to get information about the kind of query that is executed. However, in this case `toChangelogStream(Table)` is equivalent to call ``toChangelogStream(Table.getChangelogMode(), Table)` so we don't need `Table.getChangelogMode()` in the current FLIP design. But this can be future work. Let's start with `toChangelogStream(Table)` and wait for more feedback about this new feature. What do others think?

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"

I think Godfrey's proposal is too complex for regular users. Instead of continuing with the fluent programming, we would force users to define a DataStream pipeline in a lambda.

Furthermore, joining or using connect() with a different DataStream source would not be possible in this design.

The `execute()` method of `StatementSet` should not execute the DataStream API subprogram. It mixes the concepts because we tell users: "If you use toDataStream" you need to use `StreamExecutionEnvironment.execute()`.

We don't solve every potential use case with the current FLIP design but the most important one where a pipeline just uses an INSERT INTO but also uses Table API for connectors and preprocessing and does the main logic in DataStream API:

T1 -> T2, T3 -> DataStream, T4 -> DataStream

I would consider `StatementSet.addDataStream(Table, ...)` future work for now as it is only an opimization for reusing parts of the StreamGraph. We could even perform this optimization when calling `toInsertStream` or `toChangelogStream`.

## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"

We need a Map for constant time of mapping field name to index.

We accept a nullable `fieldNames` because names are not mandatory, one can also work with indices as before.

But you are right that the fieldNames member variable can be immutable. I just wanted to avoid too many overloaded constructors. I'm fine with having one full constructor for RowKind, arity and field names (or null).

7. "a Row has two modes represented by an internal boolean flag `hasFieldOrder`." Maybe I leaked to many implementation details there that rather confuse readers than help. Internally, we need to distinguish between two kinds of rows. A user should not be bothered by this.

a) Row comes from Table API runtime: hasFieldOrder = true
Map("myAge" -> 0, "myName" -> 1)

row.getField("myName") == row.getField(1)
row.getField("myAge") == row.getField(0)

b) Row comes from user: hasFieldOrder = false
Row row = new Row(2);
row.setField("myName", "Alice");
row.setField("myAge", 32);

Map("myAge" -> 1, "myName" -> 0)

But the type information will decide about the order of the fields later and reorder them accordingly during serialization or RowData conversion:

["myName", "myAge"] vs. ["myAge", "myName"]

The user must not care about this as it always feels naturally to deal with the rows.

Regards,
Timo


On 01.09.20 06:19, Jark Wu wrote:
Hi Timo,

Thanks a lot for the great proposal and sorry for the late reply. This is
an important improvement for DataStream and Table API users.

I have listed my thoughts and questions below ;-)

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
a QueryOperation tree in the validation phase."
IIUC, that means `system_rowtime()` can only be used in the first
`select()` after `fromXxxStream()`, right?
However, I think `system_proctime()` shouldn't have this limitation,
because it doesn't rely on the underlying timestamp of StreamRecord and
  can be generated in any stage of the query.

2. "By using `system_rowtime().as("rowtime")` the watermark would be
assigned implicitly. "
What watermark will be used here? Is the pre-assigned watermark in the
DataStream (so called `system_watermak()`)?

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
DataStream<Row>"
I'm not sure whether this method is useful for users. Currently, the
`DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
filtering UPDATE_BEFORE if possible.
However, if we expose this method to users, it may be confusing. Users may
try to use this method to convert a changelog stream to an insert-only
stream by applying ChangelogMode.insertOnly(). This might be misleading.
What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
to know the ChangelogMode of the current Table first, and remove
UPDATE_BEFORE from the ChagnelogMode.
That means we have to support `Table.getChangelogMode()` first? But
`ChangelogMode` derivation requires a full optimization path on the Table,
which seems impossible now.
Therefore, IMHO, we can introduce this interface in the future if users
indeed need this. For most users, I think `toChangelogStream(Table)` is
enough.

4. "Table.execute(ChangelogMode)"
Ditto.

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"
I think the potential drawback is that it can't support multi-sink
optimization, i.e. share pipeline.
For example, if we have a Table `t1` (a heavy view uses join, aggregate),
and want to sink to "mysql" using SQL and want to continue processing using
DataStream in a job.
It's a huge waste of resources if we re-compute `t1`. It would be nice if
we can come up with a solution to share the pipeline.

I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
do you think about the following proposal?

interface StatementSet {
    StatementSet addDataStream(Table table, TableDataStreamTransform
transform);
}

interface TableDataStreamTransform {
    void transform(Context);

    interface Context {
        Table getTable();
        DataStream<Row> toInsertStream(Table);
        DataStream<T> toInsertStream(AbstractDataType<?>, Table);
        DataStream<Row> toChangelogStream(Table);
    }
}

tEnv
   .createStatementSet()
   .addInsert("mysql", table1)
   .addDataStream(table1, ctx -> {
       ctx.toInsertStream(ctx.getTable())
         .flatmap(..)
         .keyBy(..)
         .process(..)
         .addSink(...);
   })


## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
- Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
enough and more handy than Map ?
- Currently, the fieldNames member variable is mutable, is it on purpose?
Can we make it immutable? For example, only accept from the constructor.
- Why do we accept a nullable `fieldNames`?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
used for. Could you explain a bit more for this?

Best,
Jark


On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote:

Hi David,

thanks for your feedback. Feedback from someone who interacts with many
users is very valuable. I added an explanation for StatementSets to the
FLIP.

Regarding watermarks and fromInsertStream, actually the

`Schema.watermark("ts", system_watermark())`

is not really necessary in the `fromChangelogStream`. It is added to
satify the Schema interface and be similar to SQL DDL.

We could already extract the watermark strategy if we see
`system_rowtime()` because in most of the cases we will simply use the
DataStream API watermarks.

But maybe some users want to generate watermarks after preprocessing in
DataStream API. In this cases users what to define a computed watermark
expression.

So for simplicity in the Simple API we introduce:

tEnv
    .fromInsertStream(DataStream<T>)
    .select('*, system_rowtime().as("rowtime"),
system_proctime().as("proctime"))

and just rely on the watermarks that travel through DataStream API
already. I added another comment to the FLIP.

Regards,
Timo


On 19.08.20 10:53, David Anderson wrote:
Timo, nice to see this.

As someone who expects to use these interfaces, but who doesn't fully
understand the existing Table API, I like what I see. Just a couple of
comments:

The way that watermarks fit into the fromChangelogStream case makes sense
to me, and I'm wondering why watermarks don't come up in the previous
section about fromInsertStream.

I wasn't familiar with StatementSets, and I couldn't find an explanation
in
the docs. I eventually found this short paragraph in an email from Fabian
Hueske, which clarified everything in that section for me:

      FLIP-84 [1] added the concept of a "statement set" to group multiple
INSERT
      INTO statements (SQL or Table API) together. The statements in a
statement
      set are jointly optimized and executed as a single Flink job.

Maybe if you add this to the FLIP it will help other readers as well.

Best,
David

On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org>
wrote:

Hi everyone,

I would like to propose a FLIP that aims to resolve the remaining
shortcomings in the Table API:



https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

The Table API has received many new features over the last year. It
supports a new type system (FLIP-37), connectors support changelogs
(FLIP-95), we have well defined internal data structures (FLIP-95),
support for result retrieval in an interactive fashion (FLIP-84), and
soon new TableDescriptors (FLIP-129).

However, the interfaces from and to DataStream API have not been touched
during the introduction of these new features and are kind of outdated.
The interfaces lack important functionality that is available in Table
API but not exposed to DataStream API users. DataStream API is still our
most important API which is why a good interoperability is crucial.

This FLIP is a mixture of different topics that improve the
interoperability between DataStream and Table API in terms of:

- DataStream <-> Table conversion
- translation of type systems TypeInformation <-> DataType
- schema definition (incl. rowtime, watermarks, primary key)
- changelog handling
- row handling in DataStream API

I'm looking forward to your feedback.

Regards,
Timo






Reply via email to