Thanks Jark bring this discussion and organize the FLIP document.

Thanks Dawid and Timo for the feedback. Here are my thoughts.

1)  I’m +1 with using column() for both cases.

2) Expression DSL vs pure SQL string for computed columns

I think we can support them both and implement the pure SQL String first,
I agree that Expression DSL brings more possibility and flexibility, but using 
SQL string is a more unified way which can reuse most logic with DDL like 
validation and persist in Catalog, 
and Converting Expression DSL to SQL Expression is another big topic and I did 
not figure out a feasible idea until now.
So, maybe we can postpone the Expression DSL support considered the reality.

3) Methods Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps

 +1 with Dawid’s proposal to offer SQL like methods. 
 Schema()
    .column("proctime", proctime());
    .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
And we can simplify watermarkFor(“colName”, Expression watermarkStrategy)to 
watermark(“colName”, Expression watermarkStrategy), I think the later one has 
can express the meaning of “ WATERMARK FOR column_name AS 
watermark_strategy_expression“ well.

5)6)7) The new keyword vs the static method vs builder pattern

I have not strong tendency,  the new keyword and the static method on 
descriptor can nearly treated as a builder  and do same things like builder. 
For the builder pattern, we will introduce six 
methods(connector.Builder()、connector.Builder.build(), format.Builder(), 
format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think we 
could reduce these unnecessary methods.  I ‘m slightly +1 for new keyword if we 
need a choice.

8) `Connector.option(...)` class should also accept `ConfigOption`
I’m slightly -1 for this, ConfigOption may not work because the key for format 
configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of json, we need 
“json.fail-on-missing-field” rather than “fail-on-missing-field”.

public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = ConfigOptions
        .key("fail-on-missing-field")
        .booleanType()
        .defaultValue(false)

WDYT?

Best,
Leonard Xu


> 在 2020年7月15日,16:37,Timo Walther <twal...@apache.org> 写道:
> 
> Hi Jark,
> 
> thanks for working on this issue. It is time to fix this last part of 
> inconsistency in the API. I also like the core parts of the FLIP, esp. that 
> TableDescriptor is one entity that can be passed to different methods. Here 
> is some feedback from my side:
> 
> 1) +1 for just `column(...)`
> 
> 2) Expression DSL vs pure SQL string for computed columns
> I agree with Dawid. Using the Expression DSL is desireable for a consistent 
> API. Furthermore, otherwise people need to register functions if they want to 
> use them in an expression. Refactoring TableSchema is definitely on the list 
> for 1.12. Maybe we can come up with some intermediate solution where we 
> transform the expression to a SQL expression for the catalog. Until the 
> discussions around FLIP-80 and CatalogTableSchema have been finalized.
> 
> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
> We should design the descriptor very close to the SQL syntax. The more 
> similar the syntax the more likely it is too keep the new descriptor API 
> stable.
> 
> 6) static method vs new keyword
> Actually, the `new` keyword was one of the things that bothered me most in 
> the old design. Fluent APIs avoid this nowadays.
> 
> 7) make the descriptors immutable with builders
> The descriptors are some kind of builders already. But they are not called 
> "builder". Instead of coming up with the new concept of a "descriptor", we 
> should use terminology that people esp. Java/Scala users are familiar with 
> already.
> 
> We could make the descriptors immutable to pass them around easily.
> 
> Btw "Connector" and "Format" should always be in the classname. This was also 
> a mistake in the past. Instead of calling the descriptor just `Kafka` we 
> could call it `KafkaConnector`. An entire example could look like:
> 
> tEnv.createTemporaryTable(
>   "OrdersInKafka",
>   KafkaConnector.newBuilder() // builder pattern supported by IDE
>      .topic("user_logs")
>      .property("bootstrap.servers", "localhost:9092")
>      .property("group.id", "test-group")
>      .format(JsonFormat.newInstance()) // shortcut for no parameters
>      .schema(
>         Schema.newBuilder()
>            .column("user_id", DataTypes.BIGINT())
>            .column("score", DataTypes.DECIMAL(10, 2))
>            .column("log_ts", DataTypes.TIMESTAMP(3))
>            .column("my_ts", toTimestamp($("log_ts"))
>            .build()
>      )
>      .build()
> );
> 
> Instead of refacoring the existing classes, we could also think about a 
> completly new stack. I think this would avoid confusion for the old users. We 
> could deprecate the entire `Kafka` class instead of dealing with backwards 
> compatibility.
> 
> 8) minor extensions
> A general `Connector.option(...)` class should also accept `ConfigOption` 
> instead of only strings.
> A `Schema.column()` should accept `AbstractDataType` that can be resolved to 
> a `DataType` by access to a `DataTypeFactory`.
> 
> What do you think?
> 
> Thanks,
> Timo
> 
> 
> On 09.07.20 18:51, Jark Wu wrote:
>> Hi Dawid,
>> Thanks for the great feedback! Here are my responses:
>> 1) computedColumn(..) vs column(..)
>> I'm fine to use `column(..)` in both cases.
>> 2) Expression DSL vs pure SQL string for computed columns
>> This is a good point. Actually, I also prefer to use Expression DSL because
>> this is more Table API style.
>> However, this requires to modify TableSchema again to accept & expose
>> Expression as computed columns.
>> I'm not convinced about this, because AFAIK, we want to have a
>> CatalogTableSchema to hold this information
>> and don't want to extend TableSchema. Maybe Timo can give some points here.
>> Besides, this will make the descriptor API can't be persisted in Catalog
>> unless FLIP-80 is done.
>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
>> The original intention behind these APIs are providing shortcut APIs for
>> Table API users.
>> But I'm also fine to only provide the DDL-like methods if you have
>> concerns. We can discuss shortcuts in the future if users request.
>> 4) LikeOption
>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added more
>> description about this in the FLIP.
>> 5) implementation?
>> I don't want to mention too much about implementation details in the FLIP
>> at the beginning, because the API is already very long.
>> But I also added an "Implementation" section to explain them.
>> 6) static method vs new keyword
>> Personally I prefer the new keyword because it makes the API cleaner. If we
>> want remove new keyword and use static methods, we have to:
>> Either adding a `Schema.builder()/create()` method as the starting method,
>> Or duplicating all the methods as static methods, e.g. we have 12 methods
>> in `Kafka`, any of them can be a starting method, then we will have 24
>> methods in `Kafka`.
>> Both are not good, and it's hard to keep all the descriptors having the
>> same starting method name, but all the descriptors can start from the same
>> new keyword.
>> Best,
>> Jark
>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>> Correction to my point 4. The example is correct. I did not read it
>>> carefully enough. Sorry for the confusion. Nevertheless I'd still like
>>> to see a bit more explanation on the LikeOptions.
>>> 
>>> On 07/07/2020 04:32, Jark Wu wrote:
>>>> Hi everyone,
>>>> 
>>>> Leonard and I prepared a FLIP about refactoring current Descriptor API,
>>>> i.e. TableEnvironment#connect(). We would like to propose a new
>>> descriptor
>>>> API to register connectors in Table API.
>>>> 
>>>> Since Flink 1.9, the community focused more on the new SQL DDL feature.
>>>> After a series of releases, the SQL DDL is powerful and has many rich
>>>> features now. However, Descriptor API (the `TableEnvironment#connect()`)
>>>> has been stagnant for a long time and missing lots of core features, such
>>>> as computed columns and primary keys. That's frustrating for Table API
>>>> users who want to register tables programmatically. Besides, currently, a
>>>> connector must implement a corresponding Descriptor (e.g. `new Kafka()`)
>>>> before using the "connect" API. Therefore, we hope to reduce this effort
>>>> for connector developers, that custom source/sinks can be registered via
>>>> the descriptor API without implementing a Descriptor.
>>>> 
>>>> These are the problems we want to resolve in this FLIP. I'm looking
>>> forward
>>>> to your comments.
>>>> 
>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>> 
>>> 
> 

Reply via email to