Thanks Jark bring this discussion and organize the FLIP document.
Thanks Dawid and Timo for the feedback. Here are my thoughts.
1) I’m +1 with using column() for both cases.
2) Expression DSL vs pure SQL string for computed columns
I think we can support them both and implement the pure SQL String first,
I agree that Expression DSL brings more possibility and flexibility, but using
SQL string is a more unified way which can reuse most logic with DDL like
validation and persist in Catalog,
and Converting Expression DSL to SQL Expression is another big topic and I did
not figure out a feasible idea until now.
So, maybe we can postpone the Expression DSL support considered the reality.
3) Methods Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
+1 with Dawid’s proposal to offer SQL like methods.
Schema()
.column("proctime", proctime());
.watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
And we can simplify watermarkFor(“colName”, Expression watermarkStrategy)to
watermark(“colName”, Expression watermarkStrategy), I think the later one has
can express the meaning of “ WATERMARK FOR column_name AS
watermark_strategy_expression“ well.
5)6)7) The new keyword vs the static method vs builder pattern
I have not strong tendency, the new keyword and the static method on
descriptor can nearly treated as a builder and do same things like builder.
For the builder pattern, we will introduce six
methods(connector.Builder()、connector.Builder.build(), format.Builder(),
format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think we
could reduce these unnecessary methods. I ‘m slightly +1 for new keyword if we
need a choice.
8) `Connector.option(...)` class should also accept `ConfigOption`
I’m slightly -1 for this, ConfigOption may not work because the key for format
configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of json, we need
“json.fail-on-missing-field” rather than “fail-on-missing-field”.
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = ConfigOptions
.key("fail-on-missing-field")
.booleanType()
.defaultValue(false)
WDYT?
Best,
Leonard Xu
> 在 2020年7月15日,16:37,Timo Walther <[email protected]> 写道:
>
> Hi Jark,
>
> thanks for working on this issue. It is time to fix this last part of
> inconsistency in the API. I also like the core parts of the FLIP, esp. that
> TableDescriptor is one entity that can be passed to different methods. Here
> is some feedback from my side:
>
> 1) +1 for just `column(...)`
>
> 2) Expression DSL vs pure SQL string for computed columns
> I agree with Dawid. Using the Expression DSL is desireable for a consistent
> API. Furthermore, otherwise people need to register functions if they want to
> use them in an expression. Refactoring TableSchema is definitely on the list
> for 1.12. Maybe we can come up with some intermediate solution where we
> transform the expression to a SQL expression for the catalog. Until the
> discussions around FLIP-80 and CatalogTableSchema have been finalized.
>
> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
> We should design the descriptor very close to the SQL syntax. The more
> similar the syntax the more likely it is too keep the new descriptor API
> stable.
>
> 6) static method vs new keyword
> Actually, the `new` keyword was one of the things that bothered me most in
> the old design. Fluent APIs avoid this nowadays.
>
> 7) make the descriptors immutable with builders
> The descriptors are some kind of builders already. But they are not called
> "builder". Instead of coming up with the new concept of a "descriptor", we
> should use terminology that people esp. Java/Scala users are familiar with
> already.
>
> We could make the descriptors immutable to pass them around easily.
>
> Btw "Connector" and "Format" should always be in the classname. This was also
> a mistake in the past. Instead of calling the descriptor just `Kafka` we
> could call it `KafkaConnector`. An entire example could look like:
>
> tEnv.createTemporaryTable(
> "OrdersInKafka",
> KafkaConnector.newBuilder() // builder pattern supported by IDE
> .topic("user_logs")
> .property("bootstrap.servers", "localhost:9092")
> .property("group.id", "test-group")
> .format(JsonFormat.newInstance()) // shortcut for no parameters
> .schema(
> Schema.newBuilder()
> .column("user_id", DataTypes.BIGINT())
> .column("score", DataTypes.DECIMAL(10, 2))
> .column("log_ts", DataTypes.TIMESTAMP(3))
> .column("my_ts", toTimestamp($("log_ts"))
> .build()
> )
> .build()
> );
>
> Instead of refacoring the existing classes, we could also think about a
> completly new stack. I think this would avoid confusion for the old users. We
> could deprecate the entire `Kafka` class instead of dealing with backwards
> compatibility.
>
> 8) minor extensions
> A general `Connector.option(...)` class should also accept `ConfigOption`
> instead of only strings.
> A `Schema.column()` should accept `AbstractDataType` that can be resolved to
> a `DataType` by access to a `DataTypeFactory`.
>
> What do you think?
>
> Thanks,
> Timo
>
>
> On 09.07.20 18:51, Jark Wu wrote:
>> Hi Dawid,
>> Thanks for the great feedback! Here are my responses:
>> 1) computedColumn(..) vs column(..)
>> I'm fine to use `column(..)` in both cases.
>> 2) Expression DSL vs pure SQL string for computed columns
>> This is a good point. Actually, I also prefer to use Expression DSL because
>> this is more Table API style.
>> However, this requires to modify TableSchema again to accept & expose
>> Expression as computed columns.
>> I'm not convinced about this, because AFAIK, we want to have a
>> CatalogTableSchema to hold this information
>> and don't want to extend TableSchema. Maybe Timo can give some points here.
>> Besides, this will make the descriptor API can't be persisted in Catalog
>> unless FLIP-80 is done.
>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
>> The original intention behind these APIs are providing shortcut APIs for
>> Table API users.
>> But I'm also fine to only provide the DDL-like methods if you have
>> concerns. We can discuss shortcuts in the future if users request.
>> 4) LikeOption
>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added more
>> description about this in the FLIP.
>> 5) implementation?
>> I don't want to mention too much about implementation details in the FLIP
>> at the beginning, because the API is already very long.
>> But I also added an "Implementation" section to explain them.
>> 6) static method vs new keyword
>> Personally I prefer the new keyword because it makes the API cleaner. If we
>> want remove new keyword and use static methods, we have to:
>> Either adding a `Schema.builder()/create()` method as the starting method,
>> Or duplicating all the methods as static methods, e.g. we have 12 methods
>> in `Kafka`, any of them can be a starting method, then we will have 24
>> methods in `Kafka`.
>> Both are not good, and it's hard to keep all the descriptors having the
>> same starting method name, but all the descriptors can start from the same
>> new keyword.
>> Best,
>> Jark
>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[email protected]>
>> wrote:
>>> Correction to my point 4. The example is correct. I did not read it
>>> carefully enough. Sorry for the confusion. Nevertheless I'd still like
>>> to see a bit more explanation on the LikeOptions.
>>>
>>> On 07/07/2020 04:32, Jark Wu wrote:
>>>> Hi everyone,
>>>>
>>>> Leonard and I prepared a FLIP about refactoring current Descriptor API,
>>>> i.e. TableEnvironment#connect(). We would like to propose a new
>>> descriptor
>>>> API to register connectors in Table API.
>>>>
>>>> Since Flink 1.9, the community focused more on the new SQL DDL feature.
>>>> After a series of releases, the SQL DDL is powerful and has many rich
>>>> features now. However, Descriptor API (the `TableEnvironment#connect()`)
>>>> has been stagnant for a long time and missing lots of core features, such
>>>> as computed columns and primary keys. That's frustrating for Table API
>>>> users who want to register tables programmatically. Besides, currently, a
>>>> connector must implement a corresponding Descriptor (e.g. `new Kafka()`)
>>>> before using the "connect" API. Therefore, we hope to reduce this effort
>>>> for connector developers, that custom source/sinks can be registered via
>>>> the descriptor API without implementing a Descriptor.
>>>>
>>>> These are the problems we want to resolve in this FLIP. I'm looking
>>> forward
>>>> to your comments.
>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>
>>>
>