I'm jumping in quite late but I think overall this is a very good effort and it's in very good shape now.

Best,
Aljoscha

On 24.07.20 10:24, Jark Wu wrote:
Thanks Dawid,

Regarding (3), I think you mentioned it will affect other behavior, e.g.
listing tables, is a strong reason.
I will at least consider implementing it in the `TableSourceQueryOperation`
way to avoid affecting other behaviors. I have updated the FLIP.
Anyway, this is implementation detail, we can continue this discussion in
JIRA and code review.

Thank you all for the response.
It seems that everyone participated in the discussion has reached a
consensus.
I will start a vote later today.

Best,
Jark

On Fri, 24 Jul 2020 at 16:03, Leonard Xu <xbjt...@gmail.com> wrote:

Thanks Jark for the update.

The latest FLIP looks well.

I like Dawid’s proposal of TableDescriptor.

Best
Leonard Xu

在 2020年7月23日,22:56,Jark Wu <imj...@gmail.com> 写道:

Hi Timo,

That's a good point I missed in the design. I have updated the FLIP and
added a note under the `KafkaConnector` to mention this.
I will not list all the method names in the FLIP as the design doc is
super long now.

================================================================
Hi Dawid,

1) KafkaConnector not extends TableDescriptor
The reason why KafkaConnector extends TableDescriptor is that, a builder
pattern "KafkaConnector.newBuilder()...build()" should return
"KafkaConnector" in theory.
So users can write something like the following code which might be
more intuitive.

KafkaConnector kafka = KafkaConnector.newBuilder()...build();
tEnv.createTemporaryTable("MyTable", kafka);

But I agree connector implementation will be simpler if this is not
strongly needed, e.g. we don't need the generic type for descriptor,
we don't need to pass the descriptor class in the builder. So I'm also
fine to not extend it if others don't against it. What's your opinion here @Timo
Walther <twal...@apache.org> ?

2) LikeOptions
I am not very satisfied with the new design. Because the API is not very
fluent. Users will be interrupted to consider what the `overwrite()`
parameter to be.
And the API design doesn't protect users from using the wrong options
before running the code.
What about to list all possible options in one level? This will be more
aligned with SQL DDL and easy to understand and use for users.

public enum LikeOption {
   INCLUDING_ALL,
   INCLUDING_CONSTRAINTS,
   INCLUDING_GENERATED,
   INCLUDING_OPTIONS,
   INCLUDING_PARTITIONS,
   INCLUDING_WATERMARKS,

   EXCLUDING_ALL,
   EXCLUDING_CONSTRAINTS,
   EXCLUDING_GENERATED,
   EXCLUDING_OPTIONS,
   EXCLUDING_PARTITIONS,
   EXCLUDING_WATERMARKS,

   OVERWRITING_GENERATED,
   OVERWRITING_OPTIONS
}

3) register the table under a generated table path
I'm afraid we have to do that. The generated table path is still needed
for `TableSourceTable#tableIdentifier` which is used to calculate the
digest.
This requires that the registered table must have an unique identifier.
The old `TableSourceQueryOperation` will also generate the identifier
according
  to the hashcode of the TableSource object. However, the generated
identifier "Unregistered_TableSource_1234" is still possible to be in
conflict with
the user's table path. Therefore, I prefer to register the generated name
in the (temporary) catalog to throw explicit exceptions, rather than
generating a wrong plan.

================================================================
Hi @Leonard Xu <xbjt...@gmail.com> and @Jingsong Li
<jingsongl...@gmail.com> ,

Do you have other concerns on the latest FLIP and the above discussion?

Best,
Jark

On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

Hi Jark,

Thanks for the update. I think the FLIP looks really well on the high
level.

I have a few comments to the code structure in the FLIP:

1) I really don't like how the TableDescriptor exposes protected fields.
Moreover why do we need to extend from it? I don't think we need
KafkaConnector extends TableDescriptor and alike. We only need the builders
e.g. the KafkaConnectorBuilder.

If I understand it correctly this is the interface needed from the
TableEnvironment perspective and it is the contract that the
TableEnvironment expects. I would suggest making it an interface:
@PublicEvolving
public interface TableDescriptor {
     List<String> getPartitionedFields();
       Schema getSchema();
     Map<String, String> getOptions();
     LikeOption[] getLikeOptions();
     String getLikePath();
}

Then the TableDescriptorBuilder would work with an internal
implementation of this interface
@PublicEvolving
public abstract class TableDescriptorBuilder<BUILDER extends 
TableDescriptorBuilder<BUILDER>>
{

     private final InternalTableDescriptor descriptor = new
InternalTableDescriptor();

     /**
      * Returns the this builder instance in the type of subclass.
      */
     protected abstract BUILDER self();

     /**
      * Specifies the table schema.
      */
     public BUILDER schema(Schema schema) {
         descriptor.schema = schema;
         return self();
     }

     /**
      * Specifies the partition keys of this table.
      */
     public BUILDER partitionedBy(String... fieldNames) {
         checkArgument(descriptor.partitionedFields.isEmpty(), 
"partitionedBy(...)
shouldn't be called more than once.");
         descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
         return self();
     }

     /**
      * Extends some parts from the original registered table path.
      */
     public BUILDER like(String tablePath, LikeOption... likeOptions) {
         descriptor.likePath = tablePath;
         descriptor.likeOptions = likeOptions;
         return self();
     }

     protected BUILDER option(String key, String value) {
         descriptor.options.put(key, value);
         return self();
     }

     /**
      * Returns created table descriptor.
      */
     public TableDescriptor build() {
         return descriptor;
     }
}


2) I'm also not the biggest fun of how the LikeOptions are suggested in
the doc. Can't we have something more like

class LikeOption {

     public enum MergingStrategy {
         INCLUDING,
         EXCLUDING,
         OVERWRITING
     }

     public enum FeatureOption {
         ALL,
         CONSTRAINTS,
         GENERATED,
         OPTIONS,
         PARTITIONS,
         WATERMARKS
     }

     private final MergingStrategy mergingStrategy;
     private final FeatureOption featureOption;


     public static final LikeOption including(FeatureOption option) {

         return new LikeOption(MergingStrategy.INCLUDING, option);

     }

     public static final LikeOption overwriting(FeatureOption option) {

         Preconditions.checkArgument(option != ALL && ...);

         return new LikeOption(MergingStrategy.INCLUDING, option);

     }

}


3) TableEnvironment#from(descriptor) will register descriptor under a
system generated table path (just like TableImpl#toString) first, and scan
from the table path to derive the Table. Table#executeInsert() does it in
the similar way.

I would try not to register the table under a generated table path. Do we
really need that? I am pretty sure we can use the tables without
registering them in a catalog. Similarly to the old
TableSourceQueryOperation.

Otherwise looks good

Best,

Dawid

On 23/07/2020 10:35, Timo Walther wrote:

Hi Jark,

thanks for the update. I think the FLIP is in a really good shape now and
ready to be voted. If others have no further comments?

I have one last comment around the methods of the descriptor builders.
When refactoring classes such as `KafkaConnector` or
`ElasticsearchConnector`. We should align the method names with the new
property names introduced in FLIP-122:

KafkaConnector.newBuilder()
   // similar to scan.startup.mode=earliest-offset
   .scanStartupModeEarliest()
   // similar to sink.partitioner=round-robin
   .sinkPartitionerRoundRobin()

What do you think?

Thanks for driving this,
Timo


On 22.07.20 17:26, Jark Wu wrote:

Hi all,

After some offline discussion with other people, I'm also fine with using
the builder pattern now,
   even though I still think the `.build()` method is a little verbose in
the
user code.

I have updated the FLIP with following changes:

1) use builder pattern instead of "new" keyword. In order to avoid
duplicate code and reduce development burden for connector developers,
       I introduced abstract classes `TableDescriptorBuilder` and
`FormatDescriptorBuilder`.
      All the common methods are pre-defined in the base builder class,
all
the custom descriptor builder should extend from the base builder
classes.
      And we can add more methods into the base builder class in the
future
without changes in the connectors.
2) use Expression instead of SQL expression string for computed column
and
watermark strategy
3) use `watermark(rowtime, expr)` as the watermark method.
4) `Schema.column()` accepts `AbstractDataType` instead of `DataType`
5) drop Schema#proctime and
Schema#watermarkFor#boundedOutOfOrderTimestamps

A full example will look like this:

tEnv.createTemporaryTable(
      "MyTable",
      KafkaConnector.newBuilder()
          .version("0.11")
          .topic("user_logs")
          .property("bootstrap.servers", "localhost:9092")
          .property("group.id", "test-group")
          .startFromEarliest()
          .sinkPartitionerRoundRobin()

.format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
          .schema(
              Schema.newBuilder()
                  .column("user_id", DataTypes.BIGINT())
                  .column("user_name", DataTypes.STRING())
                  .column("score", DataTypes.DECIMAL(10, 2))
                  .column("log_ts", DataTypes.STRING())
                  .column("part_field_0", DataTypes.STRING())
                  .column("part_field_1", DataTypes.INT())
                  .column("proc", proctime()) // define a processing-time
attribute with column name "proc"
                  .column("ts", toTimestamp($("log_ts")))
                  .watermark("ts", $("ts").minus(lit(3).seconds()))
                  .primaryKey("user_id")
                  .build())
          .partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't
support partitioned table yet, this is just an example for the API
          .build()
);

I hope this resolves all your concerns. Welcome for further feedback!

Updated FLIP:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder

POC:

https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3

Best,
Jark

On Thu, 16 Jul 2020 at 20:18, Jark Wu <imj...@gmail.com>
<imj...@gmail.com> wrote:

Thank you all for the discussion!

Here are my comments:

2) I agree we should support Expression as a computed column. But I'm in
favor of Leonard's point that maybe we can also support SQL string
expression as a computed column.
Because it also keeps aligned with DDL. The concern for Expression is
that
converting Expression to SQL string, or (de)serializing Expression is
another topic not clear and may involve lots of work.
Maybe we can support Expression later if time permits.

6,7) I still prefer the "new" keyword over builder. I don't think
immutable is a strong reason. I care more about usability and experience
from users and devs perspective.
    - Users need to type more words if using builder:
`KafkaConnector.newBuilder()...build()`  vs `new KafkaConnector()...`
    - It's more difficult for developers to write a descriptor.  2 classes
(KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders,
schema, partitionedBy, like, etc..).
      With the "new" keyword all the common methods are defined by the
framework.
    - It's hard to have the same API style for different connectors,
because
the common methods are defined by users. For example, some may have
`withSchema`, `partitionKey`, `withLike`, etc...

8) I'm -1 to `ConfigOption`. The ConfigOption is not used on
`JsonFormat`,
but the generic `Connector#option`. This doesn't work when using format
options.

new Connector("kafka")
   .option(JsonOptions.IGNORE_PARSE_ERRORS, true);   // this is wrong,
because "kafka" requires "json.ignore-parse-errors" as the option key,
not
the "ignore-parse-errors".


========================================
Hi Timo, regarding having a complete new stack, I have thought about
that.
But I still prefer to refactor the existing stack. Reasons:
Because I think it will be more confusing if users will see two similar
stacks and may have many problems if using the wrong class.
For example, we may have two `Schema` and `TableDescriptor` classes. The
`KafkaConnector` can't be used in legacy `connect()` API,
the legacy `Kafka` class can't be used in the new
`createTemporaryTable()`
API.
Besides, the existing API has been deprecated in 1.11, I think it's fine
to remove them in 1.12.


Best,
Jark


On Thu, 16 Jul 2020 at 15:26, Jingsong Li <jingsongl...@gmail.com>
<jingsongl...@gmail.com> wrote:

Thanks for the discussion.

Descriptor lacks the watermark and the computed column is too long.

1) +1 for just `column(...)`

2) +1 for being consistent with Table API, the Java Table API should be
Expression DSL. We don't need pure string support, users should just use
DDL instead. I think this is just a schema descriptor? The schema
descriptor should be consistent with DDL, so, definitely, it should
contain computed columns information.

3) +1 for not containing Schema#proctime and
Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them
in
legacy apis.

6,7) +1 for removing "new" and builder and making it immutable, For Jark,
the starting method is the static method, the others are not.

8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`.
For Leonard, I don't think user needs “json.fail-on-missing-field” rather
than “fail-on-missing-field”, user should
need “fail-on-missing-field” rather than “json.fail-on-missing-field",
the
recommended way is "JsonFormat.newInstance().option(....)", should
configure options in the format scope.

Best,
Jingsong

On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <xbjt...@gmail.com>
<xbjt...@gmail.com> wrote:

Thanks Jark bring this discussion and organize the FLIP document.

Thanks Dawid and Timo for the feedback. Here are my thoughts.

1)  I’m +1 with using column() for both cases.

2) Expression DSL vs pure SQL string for computed columns

I think we can support them both and implement the pure SQL String first,
I agree that Expression DSL brings more possibility and flexibility, but
using SQL string is a more unified way which can reuse most logic with
DDL
like validation and persist in Catalog,
and Converting Expression DSL to SQL Expression is another big topic and
I did not figure out a feasible idea until now.
So, maybe we can postpone the Expression DSL support considered the
reality.

3) Methods Schema#proctime and
Schema#watermarkFor#boundedOutOfOrderTimestamps

   +1 with Dawid’s proposal to offer SQL like methods.
   Schema()
      .column("proctime", proctime());
      .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
And we can simplify watermarkFor(“colName”, Expression
watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy),
I
think the later one has can express the meaning of “ WATERMARK FOR
column_name AS watermark_strategy_expression“ well.

5)6)7) The new keyword vs the static method vs builder pattern

I have not strong tendency,  the new keyword and the static method on
descriptor can nearly treated as a builder  and do same things like
builder.
For the builder pattern, we will introduce six
methods(connector.Builder()、connector.Builder.build(), format.Builder(),
format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think
we could reduce these unnecessary methods.  I ‘m slightly +1 for new
keyword if we need a choice.

8) `Connector.option(...)` class should also accept `ConfigOption`
I’m slightly -1 for this, ConfigOption may not work because the key for
format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of
json, we need “json.fail-on-missing-field” rather than
“fail-on-missing-field”.

public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD =
ConfigOptions
          .key("fail-on-missing-field")
          .booleanType()
          .defaultValue(false)

WDYT?

Best,
Leonard Xu


在 2020年7月15日,16:37,Timo Walther <twal...@apache.org> <twal...@apache.org>
写道:

Hi Jark,

thanks for working on this issue. It is time to fix this last part of

inconsistency in the API. I also like the core parts of the FLIP, esp.
that
TableDescriptor is one entity that can be passed to different methods.
Here
is some feedback from my side:


1) +1 for just `column(...)`

2) Expression DSL vs pure SQL string for computed columns
I agree with Dawid. Using the Expression DSL is desireable for a

consistent API. Furthermore, otherwise people need to register functions
if
they want to use them in an expression. Refactoring TableSchema is
definitely on the list for 1.12. Maybe we can come up with some
intermediate solution where we transform the expression to a SQL
expression
for the catalog. Until the discussions around FLIP-80 and
CatalogTableSchema have been finalized.


3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
We should design the descriptor very close to the SQL syntax. The more

similar the syntax the more likely it is too keep the new descriptor API
stable.


6) static method vs new keyword
Actually, the `new` keyword was one of the things that bothered me

most in the old design. Fluent APIs avoid this nowadays.


7) make the descriptors immutable with builders
The descriptors are some kind of builders already. But they are not

called "builder". Instead of coming up with the new concept of a
"descriptor", we should use terminology that people esp. Java/Scala users
are familiar with already.


We could make the descriptors immutable to pass them around easily.

Btw "Connector" and "Format" should always be in the classname. This

was also a mistake in the past. Instead of calling the descriptor just
`Kafka` we could call it `KafkaConnector`. An entire example could look
like:


tEnv.createTemporaryTable(
    "OrdersInKafka",
    KafkaConnector.newBuilder() // builder pattern supported by IDE
       .topic("user_logs")
       .property("bootstrap.servers", "localhost:9092")
       .property("group.id", "test-group")
       .format(JsonFormat.newInstance()) // shortcut for no parameters
       .schema(
          Schema.newBuilder()
             .column("user_id", DataTypes.BIGINT())
             .column("score", DataTypes.DECIMAL(10, 2))
             .column("log_ts", DataTypes.TIMESTAMP(3))
             .column("my_ts", toTimestamp($("log_ts"))
             .build()
       )
       .build()
);

Instead of refacoring the existing classes, we could also think about

a completly new stack. I think this would avoid confusion for the old
users. We could deprecate the entire `Kafka` class instead of dealing
with
backwards compatibility.


8) minor extensions
A general `Connector.option(...)` class should also accept

`ConfigOption` instead of only strings.

A `Schema.column()` should accept `AbstractDataType` that can be

resolved to a `DataType` by access to a `DataTypeFactory`.


What do you think?

Thanks,
Timo


On 09.07.20 18:51, Jark Wu wrote:

Hi Dawid,
Thanks for the great feedback! Here are my responses:
1) computedColumn(..) vs column(..)
I'm fine to use `column(..)` in both cases.
2) Expression DSL vs pure SQL string for computed columns
This is a good point. Actually, I also prefer to use Expression DSL

because

this is more Table API style.
However, this requires to modify TableSchema again to accept & expose
Expression as computed columns.
I'm not convinced about this, because AFAIK, we want to have a
CatalogTableSchema to hold this information
and don't want to extend TableSchema. Maybe Timo can give some points

here.

Besides, this will make the descriptor API can't be persisted in

Catalog

unless FLIP-80 is done.
3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
The original intention behind these APIs are providing shortcut APIs

for

Table API users.
But I'm also fine to only provide the DDL-like methods if you have
concerns. We can discuss shortcuts in the future if users request.
4) LikeOption
LikeOption.INCLUDING.ALL is a constant (enum values). I have added

more

description about this in the FLIP.
5) implementation?
I don't want to mention too much about implementation details in the

FLIP

at the beginning, because the API is already very long.
But I also added an "Implementation" section to explain them.
6) static method vs new keyword
Personally I prefer the new keyword because it makes the API cleaner.

If we

want remove new keyword and use static methods, we have to:
Either adding a `Schema.builder()/create()` method as the starting

method,

Or duplicating all the methods as static methods, e.g. we have 12

methods

in `Kafka`, any of them can be a starting method, then we will have 24
methods in `Kafka`.
Both are not good, and it's hard to keep all the descriptors having

the

same starting method name, but all the descriptors can start from the

same

new keyword.
Best,
Jark
On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <dwysakow...@apache.org


wrote:

Correction to my point 4. The example is correct. I did not read it
carefully enough. Sorry for the confusion. Nevertheless I'd still

like

to see a bit more explanation on the LikeOptions.

On 07/07/2020 04:32, Jark Wu wrote:

Hi everyone,

Leonard and I prepared a FLIP about refactoring current Descriptor

API,

i.e. TableEnvironment#connect(). We would like to propose a new

descriptor

API to register connectors in Table API.

Since Flink 1.9, the community focused more on the new SQL DDL

feature.

After a series of releases, the SQL DDL is powerful and has many

rich

features now. However, Descriptor API (the

`TableEnvironment#connect()`)

has been stagnant for a long time and missing lots of core

features, such

as computed columns and primary keys. That's frustrating for Table

API

users who want to register tables programmatically. Besides,

currently, a

connector must implement a corresponding Descriptor (e.g. `new

Kafka()`)

before using the "connect" API. Therefore, we hope to reduce this

effort

for connector developers, that custom source/sinks can be

registered via

the descriptor API without implementing a Descriptor.

These are the problems we want to resolve in this FLIP. I'm looking

forward

to your comments.




https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API


Best,
Jark







--
Best, Jingsong Lee








Reply via email to