Re: Fwd: [DISCUSS] Flink SQL DDL Design

2019-05-20 Thread Yuzhao Chen
Thanks for all you discussions Timo, Jark Wu, Lin Li, Fabian

I have fired a new design doc in [1] which reorganized from the MVP doc[2] and 
the initial doc[3] that both proposed by Shuyi Chen.

The main diff is that i extend the create table DDL to support complex sql type 
(array, map and struct), also i have added 3 points that need to reach a 
consensus:

1. Where we should put the parse code ? (blocking)
2. How to integrate the sql parse logic with TableEnvironment and SqlClient ? 
(blocking)
3. What format should should the table ddl with clause be like ? (non-blocking)

For points 1 and 2, i have put some solutions in [1], we need a duscussion 
because they are blocking, for 3 i have fired a new JIRA issue to track[4].

[1] 
https://docs.google.com/document/d/1OmVyuPk9ibGUC-CnPHbXvCg_fdG1TeC3lXSnqcUEYmM/edit?usp=sharing
[2] 
https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit
[3] 
https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit
[4] https://issues.apache.org/jira/browse/FLINK-12557

Best,
Danny Chan
>
> Thanks for all you discussions Timo, Jark Wu, Lin Li, Fabian
>
> I have fired a new design doc in [1] which is reorganized from the MVP doc[2] 
> and the initial doc[3] that both proposed by Shuyi Chen.
>
> The main diff is that i extend the create table DDL to support complex sql 
> type (array, map and struct), also i have added 3 points that need to reach a 
> consensus:
>
> 1. Where we should put the parse code ? (blocking)
> 2. How to integrate the sql parse logic with TableEnvironment and SqlClient ? 
> (blocking)
> 3. What format should the table ddl with clause be like ? (non-blocking)
>
> For points 1 and 2, i have put some solutions in [1] (highlight title as 
> red), we need a duscussion because they are blocking, for 3 i have fired a 
> new JIRA issue to track[4].
>
> [1] 
> https://docs.google.com/document/d/1OmVyuPk9ibGUC-CnPHbXvCg_fdG1TeC3lXSnqcUEYmM/edit?usp=sharing
> [2] 
> https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit
> [3] 
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit
> [4] https://issues.apache.org/jira/browse/FLINK-12557


Re: [DISCUSS] Flink SQL DDL Design

2018-12-17 Thread Jark Wu
le
> >>>>>>>>>> `writeTiemstamp`, the only attribute actived in the stream will
> >> be
> >>>>>>>> write to
> >>>>>>>>>> Kafka message header. What I mean the timestmap in StreamRecord
> >> is
> >>>>>> the
> >>>>>>>> time
> >>>>>>>>>> attribute in the stream.
> >>>>>>>>>>
> >>>>>>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the
> >> INDEX,
> >>>>>>>> PRIMARY
> >>>>>>>>>> KEY keywords.
> >>>>>>>>>>
> >>>>>>>>>> @Timo, Do you have any other advice or questions on the
> >> watermark
> >>>>>>>> syntax ?
> >>>>>>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
> >>>>>>>> "OFFSET"
> >>>>>>>>>> VS ...
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Jark
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li 
> >>> wrote:
> >>>>>>>>>>> Hi Timo,
> >>>>>>>>>>> Thanks for your feedback, here's some thoughts of mine:
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Sources/Sinks:
> >>>>>>>>>>> "Let's assume an interactive CLI session, people should be able
> >>> to
> >>>>>>> list
> >>>>>>>>>> all
> >>>>>>>>>>> source table and sink tables to know upfront if they can use an
> >>>>>>> INSERT
> >>>>>>>>>> INTO
> >>>>>>>>>>> here or not."
> >>>>>>>>>>> This requirement can be simply resolved by a document that list
> >>> all
> >>>>>>>>>>> supported source/sink/both connectors and the sql-client can
> >>>>>> perform
> >>>>>>> a
> >>>>>>>>>>> quick check. It's only an implementation choice, not necessary
> >>> for
> >>>>>>> the
> >>>>>>>>>>> syntax.
> >>>>>>>>>>> For connector implementation, a connector may implement one or
> >>> some
> >>>>>>> or
> >>>>>>>>>> all
> >>>>>>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can
> >>>>>> derive
> >>>>>>>> the
> >>>>>>>>>>> availability for any give query without the SOURCE/SINk
> >> keywords
> >>> or
> >>>>>>>>>>> specific table properties in WITH clause.
> >>>>>>>>>>> Since there's still indeterminacy, shall we skip these two
> >>> keywords
> >>>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>> MVP DDL? We can make further discussion after users' feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> 6. Partitioning and keys
> >>>>>>>>>>> Agree with you that raise the priority of table constraint and
> >>>>>>>>>> partitioned
> >>>>>>>>>>> table support for better connectivity to Hive and Kafka. I'll
> >> add
> >>>>>>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft
> >>> doc
> >>>>>>>>>>> later[1].
> >>>>>>>>>>>
> >>>>>>>>>>> 5. Schema declaration
> >>>>>>>>>>> "if users want to declare computed columns they have a "schema"
> >>>>>>>>>> constraints
> >>>>>>>>>>> but without columns
> >>>>>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> &

Re: [DISCUSS] Flink SQL DDL Design

2018-12-14 Thread Timo Walther
use similar to

PARTITIONED

BY is

also a good idea. Conceptually, it's fine to put watermark in

schema

part

or out schema part. But if we want to support multiple

watermark

definition, maybe it would be better to put it in schema

part.

It

is

similar to Index Definition that we can define several

indexes

on a

table

in schema part.

4d. How can people come up with a custom watermark

strategy?

In most cases, the built-in strategy can works good.

If

we

need

a

custom one, we can use a scalar function which restrict to

only

return

a

nullable Long, and use it in SQL like: WATERMARK for rowtime

AS

watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined

scalar

function

accepts 3 parameters and return a nullable Long which can be

used

as

punctuated watermark assigner. Another choice is

implementing a

class

extending the


`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy`

and

use

it

in SQL: WATERMARK for rowtime AS

'com.my.MyWatermarkStrategy'.

But

if

scalar function can cover the requirements here, I would

prefer

it

here,

because it keeps standard compliant. BTW, this feature is not

in

MVP,

we

can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the

schema

from

external storage or something schema file. Actually, we have

already

encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive

syntax

while

keeping SQL ANSI standard. This will make it more acceptable

and

reduce

the

learning cost for user.

[1]:


https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017

Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <

xuef...@alibaba-inc.com

wrote:

Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging

to

something

meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on

tables

that

belong to a security component. Unless the table is created

differently

based on source, sink, or both, it doesn't seem necessary to

use

these

keywords to enforce permissions.
5. It might be okay if schema declaration is always needed.

While

there

might be some duplication sometimes, it's not always true.

For

example,

external schema may not be exactly matching Flink schema.

For

instance,

data types. Even if so, perfect match is not required. For

instance,

the

external schema file may evolve while table schema in Flink

may

stay

unchanged. A responsible reader should be able to scan the

file

based

on

file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to

operate

on

Hive metadata and data, it's an add-on benefit if we can be

compatible

with

Hive syntax/semantics while following ANSI standard. At

least

we

should

be

as close as possible. Hive DDL can found at


https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu





--

Sender:Lin Li 
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
  thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should

we

declare

it

using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further

question,

if a

TABLE
t1 firstly declared as read only (as a source table), then

for

some

new

requirements
t1 will change to a sink table,  in this case we need

updating

both

the

DDL

and catalogs.
Further more, let's think about the BATCH query, update one

table

in-place

can be a common case.
e.g.,
```
CREATE TABLE t1 (
  col1 varchar,
  col2 int,
  col3 varchar
  ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
  (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the

validation

purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an


`org.apache.flink.table.sources.tsextractors.TimestampExtractor`

for

custom

defined time attributes usage, but this expression based

class

is

more

friendly for table api not the SQL.
```
/**
  * Provides the an expression to extract the timestamp

for a

rowtime

attribute.
  */
abstract class TimestampExtractor extends

FieldComputer[Long]

with

Serializable {

  /** Timestamp extractors compute the timestamp as Long.

*/

  override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the

TimestampExtractor

are

Re: [DISCUSS] Flink SQL DDL Design

2018-12-13 Thread Jark Wu
> schema
> > > >>>>>>> derivation as a follow-up extension ?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Timo Walther  于2018年12月6日周四 下午6:05写道:
> > > >>>>>>>
> > > >>>>>>>> Hi everyone,
> > > >>>>>>>>
> > > >>>>>>>> great to have such a lively discussion. My next batch of
> > feedback:
> > > >>>>>>>>
> > > >>>>>>>> @Jark: We don't need to align the descriptor approach with
> SQL.
> > > >> I'm
> > > >>>>>> open
> > > >>>>>>>> for different approaches as long as we can serve a broad set
> of
> > > >> use
> > > >>>>>>>> cases and systems. The descriptor approach was a first attempt
> > to
> > > >>>> cover
> > > >>>>>>>> all aspects and connector/format characteristics. Just another
> > > >>>> example,
> > > >>>>>>>> that is missing in the DDL design: How can a user decide if
> > > >> append,
> > > >>>>>>>> retraction, or upserts should be used to sink data into the
> > target
> > > >>>>>>>> system? Do we want to define all these improtant properties in
> > the
> > > >>> big
> > > >>>>>>>> WITH property map? If yes, we are already close to the
> > descriptor
> > > >>>>>>>> approach. Regarding the "standard way", most DDL languages
> have
> > > >> very
> > > >>>>>>>> custom syntax so there is not a real "standard".
> > > >>>>>>>>
> > > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access
> it
> > > >> can
> > > >>>> be
> > > >>>>>>>> created using a regular CREATE TABLE (omitting a specific
> > > >>> source/sink)
> > > >>>>>>>> declaration. Regarding the transition from source/sink to
> both,
> > > >> yes
> > > >>> we
> > > >>>>>>>> would need to update the a DDL and catalogs. But is this a
> > > >> problem?
> > > >>>> One
> > > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It
> is
> > > >> not
> > > >>>>>>>> only about security aspects. Especially for streaming use
> cases,
> > > >> not
> > > >>>>>>>> every connector can be used as a source easily. For example, a
> > > >> JDBC
> > > >>>>>> sink
> > > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> > > >>> session,
> > > >>>>>>>> people should be able to list all source table and sink tables
> > to
> > > >>> know
> > > >>>>>>>> upfront if they can use an INSERT INTO here or not.
> > > >>>>>>>>
> > > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this
> in
> > > >> the
> > > >>>>>>>> design given that Hive integration and Kafka key support are
> in
> > > >> the
> > > >>>>>>>> making/are on our roadmap for this release.
> > > >>>>>>>>
> > > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> > conflicting.
> > > >> I
> > > >>>>>> just
> > > >>>>>>>> wanted to raise the point because if users want to declare
> > > >> computed
> > > >>>>>>>> columns they have a "schema" constraints but without columns.
> > Are
> > > >> we
> > > >>>> ok
> > > >>>>>>>> with a syntax like ...
> > > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > > >>>>>

Re: [DISCUSS] Flink SQL DDL Design

2018-12-12 Thread Rong Rong
>>>>>>>> custom syntax so there is not a real "standard".
> > >>>>>>>>
> > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access it
> > >> can
> > >>>> be
> > >>>>>>>> created using a regular CREATE TABLE (omitting a specific
> > >>> source/sink)
> > >>>>>>>> declaration. Regarding the transition from source/sink to both,
> > >> yes
> > >>> we
> > >>>>>>>> would need to update the a DDL and catalogs. But is this a
> > >> problem?
> > >>>> One
> > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It is
> > >> not
> > >>>>>>>> only about security aspects. Especially for streaming use cases,
> > >> not
> > >>>>>>>> every connector can be used as a source easily. For example, a
> > >> JDBC
> > >>>>>> sink
> > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> > >>> session,
> > >>>>>>>> people should be able to list all source table and sink tables
> to
> > >>> know
> > >>>>>>>> upfront if they can use an INSERT INTO here or not.
> > >>>>>>>>
> > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this in
> > >> the
> > >>>>>>>> design given that Hive integration and Kafka key support are in
> > >> the
> > >>>>>>>> making/are on our roadmap for this release.
> > >>>>>>>>
> > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> conflicting.
> > >> I
> > >>>>>> just
> > >>>>>>>> wanted to raise the point because if users want to declare
> > >> computed
> > >>>>>>>> columns they have a "schema" constraints but without columns.
> Are
> > >> we
> > >>>> ok
> > >>>>>>>> with a syntax like ...
> > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > >>>>>>>> format.schema-file = "/my/avrofile.avsc") ?
> > >>>>>>>> @Xuefu: Yes, you are right that an external schema might not
> > >> excatly
> > >>>>>>>> match but this is true for both directions:
> > >>>>>>>> table schema "derives" format schema and format schema "derives"
> > >>> table
> > >>>>>>>> schema.
> > >>>>>>>>
> > >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but
> we
> > >>>>>>>> should not just adopt everything from Hive as there syntax is
> very
> > >>>>>>>> batch-specific. We should come up with a superset of historical
> > >> and
> > >>>>>>>> future requirements. Supporting Hive queries can be an
> > >> intermediate
> > >>>>>>>> layer on top of Flink's DDL.
> > >>>>>>>>
> > >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the
> > >>>> TimestampExtractor
> > >>>>>>>> interface as this is also important for better separation of
> > >>> connector
> > >>>>>>>> and table module [1]. However, I'm wondering about watermark
> > >>>>>> generation.
> > >>>>>>>> 4a. timestamps are in the schema twice:
> > >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it as
> > >>>>>>>> rowtime": yes, but we need to mark a field as such an attribute.
> > >> How
> > >>>>>>>> does the syntax for marking look like? Also in case of
> timestamps
> > >>> that
> > >>>>>>>> are nested in the schema?
> > >>>>>>>>
> > >>>>>>>> 4b. how can we write out a timestamp into the message header?:
> > >>>>>>>>

Re: [DISCUSS] Flink SQL DDL Design

2018-12-12 Thread Teja MVSR
t types unseen.
> > > >>>>>>> And Xuefu pointed a important matching problem, so let's put
> > schema
> > > >>>>>>> derivation as a follow-up extension ?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Timo Walther  于2018年12月6日周四 下午6:05写道:
> > > >>>>>>>
> > > >>>>>>>> Hi everyone,
> > > >>>>>>>>
> > > >>>>>>>> great to have such a lively discussion. My next batch of
> > feedback:
> > > >>>>>>>>
> > > >>>>>>>> @Jark: We don't need to align the descriptor approach with
> SQL.
> > > >> I'm
> > > >>>>>> open
> > > >>>>>>>> for different approaches as long as we can serve a broad set
> of
> > > >> use
> > > >>>>>>>> cases and systems. The descriptor approach was a first attempt
> > to
> > > >>>> cover
> > > >>>>>>>> all aspects and connector/format characteristics. Just another
> > > >>>> example,
> > > >>>>>>>> that is missing in the DDL design: How can a user decide if
> > > >> append,
> > > >>>>>>>> retraction, or upserts should be used to sink data into the
> > target
> > > >>>>>>>> system? Do we want to define all these improtant properties in
> > the
> > > >>> big
> > > >>>>>>>> WITH property map? If yes, we are already close to the
> > descriptor
> > > >>>>>>>> approach. Regarding the "standard way", most DDL languages
> have
> > > >> very
> > > >>>>>>>> custom syntax so there is not a real "standard".
> > > >>>>>>>>
> > > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access
> it
> > > >> can
> > > >>>> be
> > > >>>>>>>> created using a regular CREATE TABLE (omitting a specific
> > > >>> source/sink)
> > > >>>>>>>> declaration. Regarding the transition from source/sink to
> both,
> > > >> yes
> > > >>> we
> > > >>>>>>>> would need to update the a DDL and catalogs. But is this a
> > > >> problem?
> > > >>>> One
> > > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It
> is
> > > >> not
> > > >>>>>>>> only about security aspects. Especially for streaming use
> cases,
> > > >> not
> > > >>>>>>>> every connector can be used as a source easily. For example, a
> > > >> JDBC
> > > >>>>>> sink
> > > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> > > >>> session,
> > > >>>>>>>> people should be able to list all source table and sink tables
> > to
> > > >>> know
> > > >>>>>>>> upfront if they can use an INSERT INTO here or not.
> > > >>>>>>>>
> > > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this
> in
> > > >> the
> > > >>>>>>>> design given that Hive integration and Kafka key support are
> in
> > > >> the
> > > >>>>>>>> making/are on our roadmap for this release.
> > > >>>>>>>>
> > > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> > conflicting.
> > > >> I
> > > >>>>>> just
> > > >>>>>>>> wanted to raise the point because if users want to declare
> > > >> computed
> > > >>>>>>>> columns they have a "schema" constraints but without columns.
> > Are
> > > >> we
> > > >>>> ok
> > > >>>>>>>> with a syntax like ...
> > > &g

Re: [DISCUSS] Flink SQL DDL Design

2018-12-12 Thread Kurt Young
t;>>>>
> > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this in
> > >> the
> > >>>>>>>> design given that Hive integration and Kafka key support are in
> > >> the
> > >>>>>>>> making/are on our roadmap for this release.
> > >>>>>>>>
> > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> conflicting.
> > >> I
> > >>>>>> just
> > >>>>>>>> wanted to raise the point because if users want to declare
> > >> computed
> > >>>>>>>> columns they have a "schema" constraints but without columns.
> Are
> > >> we
> > >>>> ok
> > >>>>>>>> with a syntax like ...
> > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > >>>>>>>> format.schema-file = "/my/avrofile.avsc") ?
> > >>>>>>>> @Xuefu: Yes, you are right that an external schema might not
> > >> excatly
> > >>>>>>>> match but this is true for both directions:
> > >>>>>>>> table schema "derives" format schema and format schema "derives"
> > >>> table
> > >>>>>>>> schema.
> > >>>>>>>>
> > >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but
> we
> > >>>>>>>> should not just adopt everything from Hive as there syntax is
> very
> > >>>>>>>> batch-specific. We should come up with a superset of historical
> > >> and
> > >>>>>>>> future requirements. Supporting Hive queries can be an
> > >> intermediate
> > >>>>>>>> layer on top of Flink's DDL.
> > >>>>>>>>
> > >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the
> > >>>> TimestampExtractor
> > >>>>>>>> interface as this is also important for better separation of
> > >>> connector
> > >>>>>>>> and table module [1]. However, I'm wondering about watermark
> > >>>>>> generation.
> > >>>>>>>> 4a. timestamps are in the schema twice:
> > >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it as
> > >>>>>>>> rowtime": yes, but we need to mark a field as such an attribute.
> > >> How
> > >>>>>>>> does the syntax for marking look like? Also in case of
> timestamps
> > >>> that
> > >>>>>>>> are nested in the schema?
> > >>>>>>>>
> > >>>>>>>> 4b. how can we write out a timestamp into the message header?:
> > >>>>>>>> I agree to simply ignore computed columns when writing out. This
> > >> is
> > >>>>>> like
> > >>>>>>>> 'field-change: add' that I mentioned in the improvements
> document.
> > >>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to
> Kafka
> > >>>>>>>> message header": Unfortunately, there is no timestamp in the
> > >> stream
> > >>>>>>>> record. Additionally, multiple time attributes can be in a
> schema.
> > >>> So
> > >>>>>> we
> > >>>>>>>> need a constraint that tells the sink which column to use
> > >> (possibly
> > >>>>>>>> computed as well)?
> > >>>>>>>>
> > >>>>>>>> 4c. separate all time attribute concerns into a special clause
> > >> next
> > >>> to
> > >>>>>>>> the regular schema?
> > >>>>>>>> @Jark: I don't have a strong opinion on this. I just have the
> > >>> feeling
> > >>>>>>>> that the "schema part" becomes quite messy because the actual
> > >> schema
> > >>>>>>>> with types and fields is accompanied by so much metadata about
> > >>>>>>>> timestamps, watermarks, key

Re: [DISCUSS] Flink SQL DDL Design

2018-12-12 Thread Shuyi Chen
ave
> >> very
> >>>>>>>> custom syntax so there is not a real "standard".
> >>>>>>>>
> >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access it
> >> can
> >>>> be
> >>>>>>>> created using a regular CREATE TABLE (omitting a specific
> >>> source/sink)
> >>>>>>>> declaration. Regarding the transition from source/sink to both,
> >> yes
> >>> we
> >>>>>>>> would need to update the a DDL and catalogs. But is this a
> >> problem?
> >>>> One
> >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It is
> >> not
> >>>>>>>> only about security aspects. Especially for streaming use cases,
> >> not
> >>>>>>>> every connector can be used as a source easily. For example, a
> >> JDBC
> >>>>>> sink
> >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> >>> session,
> >>>>>>>> people should be able to list all source table and sink tables to
> >>> know
> >>>>>>>> upfront if they can use an INSERT INTO here or not.
> >>>>>>>>
> >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this in
> >> the
> >>>>>>>> design given that Hive integration and Kafka key support are in
> >> the
> >>>>>>>> making/are on our roadmap for this release.
> >>>>>>>>
> >>>>>>>> 5. Schema declaration: @Lin: You are right it is not conflicting.
> >> I
> >>>>>> just
> >>>>>>>> wanted to raise the point because if users want to declare
> >> computed
> >>>>>>>> columns they have a "schema" constraints but without columns. Are
> >> we
> >>>> ok
> >>>>>>>> with a syntax like ...
> >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> >>>>>>>> format.schema-file = "/my/avrofile.avsc") ?
> >>>>>>>> @Xuefu: Yes, you are right that an external schema might not
> >> excatly
> >>>>>>>> match but this is true for both directions:
> >>>>>>>> table schema "derives" format schema and format schema "derives"
> >>> table
> >>>>>>>> schema.
> >>>>>>>>
> >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but we
> >>>>>>>> should not just adopt everything from Hive as there syntax is very
> >>>>>>>> batch-specific. We should come up with a superset of historical
> >> and
> >>>>>>>> future requirements. Supporting Hive queries can be an
> >> intermediate
> >>>>>>>> layer on top of Flink's DDL.
> >>>>>>>>
> >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the
> >>>> TimestampExtractor
> >>>>>>>> interface as this is also important for better separation of
> >>> connector
> >>>>>>>> and table module [1]. However, I'm wondering about watermark
> >>>>>> generation.
> >>>>>>>> 4a. timestamps are in the schema twice:
> >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it as
> >>>>>>>> rowtime": yes, but we need to mark a field as such an attribute.
> >> How
> >>>>>>>> does the syntax for marking look like? Also in case of timestamps
> >>> that
> >>>>>>>> are nested in the schema?
> >>>>>>>>
> >>>>>>>> 4b. how can we write out a timestamp into the message header?:
> >>>>>>>> I agree to simply ignore computed columns when writing out. This
> >> is
> >>>>>> like
> >>>>>>>> 'field-change: add' that I mentioned in the improvements document.
> >>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to Kafka
> >>>>>>>

Re: [DISCUSS] Flink SQL DDL Design

2018-12-11 Thread Timo Walther
time AS 'com.my.MyWatermarkStrategy'.

But

if

scalar function can cover the requirements here, I would prefer

it

here,

because it keeps standard compliant. BTW, this feature is not in

MVP,

we

can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the schema

from

external storage or something schema file. Actually, we have

already

encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive

syntax

while

keeping SQL ANSI standard. This will make it more acceptable and

reduce

the

learning cost for user.

[1]:


https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017

Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <

xuef...@alibaba-inc.com

wrote:

Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to

something

meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on

tables

that

belong to a security component. Unless the table is created

differently

based on source, sink, or both, it doesn't seem necessary to use

these

keywords to enforce permissions.
5. It might be okay if schema declaration is always needed.

While

there

might be some duplication sometimes, it's not always true. For

example,

external schema may not be exactly matching Flink schema. For

instance,

data types. Even if so, perfect match is not required. For

instance,

the

external schema file may evolve while table schema in Flink may

stay

unchanged. A responsible reader should be able to scan the file

based

on

file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to

operate

on

Hive metadata and data, it's an add-on benefit if we can be

compatible

with

Hive syntax/semantics while following ANSI standard. At least we

should

be

as close as possible. Hive DDL can found at


https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu





--

Sender:Lin Li 
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
 thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we

declare

it

using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further

question,

if a

TABLE
t1 firstly declared as read only (as a source table), then for

some

new

requirements
t1 will change to a sink table,  in this case we need updating

both

the

DDL

and catalogs.
Further more, let's think about the BATCH query, update one

table

in-place

can be a common case.
e.g.,
```
CREATE TABLE t1 (
 col1 varchar,
 col2 int,
 col3 varchar
 ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
 (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the

validation

purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an
`org.apache.flink.table.sources.tsextractors.TimestampExtractor`

for

custom

defined time attributes usage, but this expression based class

is

more

friendly for table api not the SQL.
```
/**
 * Provides the an expression to extract the timestamp for a

rowtime

attribute.
 */
abstract class TimestampExtractor extends FieldComputer[Long]

with

Serializable {

 /** Timestamp extractors compute the timestamp as Long. */
 override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the TimestampExtractor

are

expressing computing logic, the TimestampExtractor has no more

advantage in

SQL scenarios.


6. Partitioning and keys
Primary Key is included in Constraint part, and partitioned

table

support

can be another topic later.

5. Schema declaration
Agree with you that we can do better schema derivation for user
convenience, but this is not conflict with the syntax.
Table properties can carry any useful informations both for the

users

and

the framework, I like your `contract name` proposal,
e.g., `WITH (format.type = avro)`, the framework can recognize

some

`contract name` like `format.type`, `connector.type` and etc.
And also derive the table schema from an existing schema file

can

be

handy

especially one with too many table columns.

Regards,
Lin


Timo Walther  于2018年12月5日周三 下午10:40写道:


Hi Jark and Shuyi,

thanks for pushing the DDL efforts forward. I agree that we

should

aim

to combine both Shuyi's design and your design.

Here are a couple of concerns that I think we should address in

Re: [DISCUSS] Flink SQL DDL Design

2018-12-09 Thread Jark Wu
 compatibility: @Xuefu: I agree that Hive is popular but we
> > > >>>> should not just adopt everything from Hive as there syntax is very
> > > >>>> batch-specific. We should come up with a superset of historical
> and
> > > >>>> future requirements. Supporting Hive queries can be an
> intermediate
> > > >>>> layer on top of Flink's DDL.
> > > >>>>
> > > >>>> 4. Time attributes: @Lin: I'm fine with changing the
> > > TimestampExtractor
> > > >>>> interface as this is also important for better separation of
> > connector
> > > >>>> and table module [1]. However, I'm wondering about watermark
> > > >> generation.
> > > >>>> 4a. timestamps are in the schema twice:
> > > >>>> @Jark: "existing field is Long/Timestamp, we can just use it as
> > > >>>> rowtime": yes, but we need to mark a field as such an attribute.
> How
> > > >>>> does the syntax for marking look like? Also in case of timestamps
> > that
> > > >>>> are nested in the schema?
> > > >>>>
> > > >>>> 4b. how can we write out a timestamp into the message header?:
> > > >>>> I agree to simply ignore computed columns when writing out. This
> is
> > > >> like
> > > >>>> 'field-change: add' that I mentioned in the improvements document.
> > > >>>> @Jark: "then the timestmap in StreamRecord will be write to Kafka
> > > >>>> message header": Unfortunately, there is no timestamp in the
> stream
> > > >>>> record. Additionally, multiple time attributes can be in a schema.
> > So
> > > >> we
> > > >>>> need a constraint that tells the sink which column to use
> (possibly
> > > >>>> computed as well)?
> > > >>>>
> > > >>>> 4c. separate all time attribute concerns into a special clause
> next
> > to
> > > >>>> the regular schema?
> > > >>>> @Jark: I don't have a strong opinion on this. I just have the
> > feeling
> > > >>>> that the "schema part" becomes quite messy because the actual
> schema
> > > >>>> with types and fields is accompanied by so much metadata about
> > > >>>> timestamps, watermarks, keys,... and we would need to introduce a
> > new
> > > >>>> WATERMARK keyword within a schema that was close to standard up to
> > > this
> > > >>>> point.
> > > >>>>
> > > >>>> Thanks everyone,
> > > >>>> Timo
> > > >>>>
> > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-9461
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Am 06.12.18 um 07:08 schrieb Jark Wu:
> > > >>>>> Hi Timo,
> > > >>>>>
> > > >>>>> Thank you for the valuable feedbacks.
> > > >>>>>
> > > >>>>> First of all, I think we don't need to align the SQL
> functionality
> > to
> > > >>>>> Descriptor. Because SQL is a more standard API, we should be as
> > > >>> cautious
> > > >>>> as
> > > >>>>> possible to extend the SQL syntax. If something can be done in a
> > > >>> standard
> > > >>>>> way, we shouldn't introduce something new.
> > > >>>>>
> > > >>>>> Here are some of my thoughts:
> > > >>>>>
> > > >>>>> 1. Scope: Agree.
> > > >>>>> 2. Constraints: Agree.
> > > >>>>> 4. Time attributes:
> > > >>>>> 4a. timestamps are in the schema twice.
> > > >>>>>  If an existing field is Long/Timestamp, we can just use it
> as
> > > >>>> rowtime,
> > > >>>>> no twice defined. If it is not a Long/Timestamp, we use computed
> > > >> column
> > > >>>> to
> > > >>>>> get an expected timestamp column to be rowtime, is this what you
> > mean
> > > >>>>> defined twice?  But I don't think it is a problem, but an
> &

Re: [DISCUSS] Flink SQL DDL Design

2018-12-09 Thread Lin Li
gt; >>>>> the real schema is, what's the index of rowtime in the schema?
> > >>> Regarding
> > >>>> to
> > >>>>> the optimization, even if timestamps are in schema twice, when the
> > >>>> original
> > >>>>> timestamp is never used in query, then the projection pushdown
> > >>>> optimization
> > >>>>> can cut this field as early as possible, which is exactly the same
> as
> > >>>>> "replacing the existing column" in runtime.
> > >>>>>
> > >>>>>  4b. how can we write out a timestamp into the message header?
> > >>>>>   That's a good point. I think computed column is just a
> virtual
> > >>>> column
> > >>>>> on table which is only relative to reading. If we want to write to
> a
> > >>>> table
> > >>>>> with computed column defined, we only need to provide the columns
> > >>> except
> > >>>>> computed columns (see SQL Server [1]). The computed column is
> ignored
> > >>> in
> > >>>>> the insert statement. Get back to the question, how can we write
> out
> > >> a
> > >>>>> timestamp into the message header? IMO, we can provide a
> > >> configuration
> > >>> to
> > >>>>> support this, such as `kafka.writeTimestamp=true`, then the
> timestmap
> > >>> in
> > >>>>> StreamRecord will be write to Kafka message header. What do you
> > >> think?
> > >>>>>   4c. separate all time attribute concerns into a special
> clause
> > >>> next
> > >>>> to
> > >>>>> the regular schema?
> > >>>>>   Separating watermark into a special clause similar to
> > >> PARTITIONED
> > >>>> BY is
> > >>>>> also a good idea. Conceptually, it's fine to put watermark in
> schema
> > >>> part
> > >>>>> or out schema part. But if we want to support multiple watermark
> > >>>>> definition, maybe it would be better to put it in schema part. It
> is
> > >>>>> similar to Index Definition that we can define several indexes on a
> > >>> table
> > >>>>> in schema part.
> > >>>>>
> > >>>>>   4d. How can people come up with a custom watermark strategy?
> > >>>>>   In most cases, the built-in strategy can works good. If we
> need
> > >> a
> > >>>>> custom one, we can use a scalar function which restrict to only
> > >> return
> > >>> a
> > >>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime AS
> > >>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
> > >>>> function
> > >>>>> accepts 3 parameters and return a nullable Long which can be used
> as
> > >>>>> punctuated watermark assigner. Another choice is implementing a
> class
> > >>>>> extending the
> > >>>>> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and
> > >> use
> > >>>> it
> > >>>>> in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But
> if
> > >>>>> scalar function can cover the requirements here, I would prefer it
> > >>> here,
> > >>>>> because it keeps standard compliant. BTW, this feature is not in
> MVP,
> > >>> we
> > >>>>> can discuss it more depth in the future when we need it.
> > >>>>>
> > >>>>> 5. Schema declaration:
> > >>>>> I like the proposal to omit the schema if we can get the schema
> from
> > >>>>> external storage or something schema file. Actually, we have
> already
> > >>>>> encountered this requirement in out company.
> > >>>>>
> > >>>>>
> > >>>>> +1 to @Xuefu that we should be as close as possible to Hive syntax
> > >>> while
> > >>>>> keeping SQL ANSI standard. This will make it more acceptable and
> > >> reduce
> > >>>> the
> > >>>>> learning cost for user.
> > >>>>>
> &g

Re: [DISCUSS] Flink SQL DDL Design

2018-12-08 Thread Shuyi Chen
possible to extend the SQL syntax. If something can be done in a
> >>> standard
> >>>>> way, we shouldn't introduce something new.
> >>>>>
> >>>>> Here are some of my thoughts:
> >>>>>
> >>>>> 1. Scope: Agree.
> >>>>> 2. Constraints: Agree.
> >>>>> 4. Time attributes:
> >>>>> 4a. timestamps are in the schema twice.
> >>>>>  If an existing field is Long/Timestamp, we can just use it as
> >>>> rowtime,
> >>>>> no twice defined. If it is not a Long/Timestamp, we use computed
> >> column
> >>>> to
> >>>>> get an expected timestamp column to be rowtime, is this what you mean
> >>>>> defined twice?  But I don't think it is a problem, but an advantages,
> >>>>> because it is easy to use, user do not need to consider whether to
> >>>> "replace
> >>>>> the existing column" or "add a new column", he will not be confused
> >>>> what's
> >>>>> the real schema is, what's the index of rowtime in the schema?
> >>> Regarding
> >>>> to
> >>>>> the optimization, even if timestamps are in schema twice, when the
> >>>> original
> >>>>> timestamp is never used in query, then the projection pushdown
> >>>> optimization
> >>>>> can cut this field as early as possible, which is exactly the same as
> >>>>> "replacing the existing column" in runtime.
> >>>>>
> >>>>>  4b. how can we write out a timestamp into the message header?
> >>>>>   That's a good point. I think computed column is just a virtual
> >>>> column
> >>>>> on table which is only relative to reading. If we want to write to a
> >>>> table
> >>>>> with computed column defined, we only need to provide the columns
> >>> except
> >>>>> computed columns (see SQL Server [1]). The computed column is ignored
> >>> in
> >>>>> the insert statement. Get back to the question, how can we write out
> >> a
> >>>>> timestamp into the message header? IMO, we can provide a
> >> configuration
> >>> to
> >>>>> support this, such as `kafka.writeTimestamp=true`, then the timestmap
> >>> in
> >>>>> StreamRecord will be write to Kafka message header. What do you
> >> think?
> >>>>>   4c. separate all time attribute concerns into a special clause
> >>> next
> >>>> to
> >>>>> the regular schema?
> >>>>>   Separating watermark into a special clause similar to
> >> PARTITIONED
> >>>> BY is
> >>>>> also a good idea. Conceptually, it's fine to put watermark in schema
> >>> part
> >>>>> or out schema part. But if we want to support multiple watermark
> >>>>> definition, maybe it would be better to put it in schema part. It is
> >>>>> similar to Index Definition that we can define several indexes on a
> >>> table
> >>>>> in schema part.
> >>>>>
> >>>>>   4d. How can people come up with a custom watermark strategy?
> >>>>>   In most cases, the built-in strategy can works good. If we need
> >> a
> >>>>> custom one, we can use a scalar function which restrict to only
> >> return
> >>> a
> >>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime AS
> >>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
> >>>> function
> >>>>> accepts 3 parameters and return a nullable Long which can be used as
> >>>>> punctuated watermark assigner. Another choice is implementing a class
> >>>>> extending the
> >>>>> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and
> >> use
> >>>> it
> >>>>> in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
> >>>>> scalar function can cover the requirements here, I would prefer it
> >>> here,
> >>>>> because it keeps standard compliant. BTW, this feature is not in MVP,
> >>> we
> >>>>> can discuss it more depth in the future when

Re: [DISCUSS] Flink SQL DDL Design

2018-12-07 Thread Timo Walther
n is just a virtual

column

on table which is only relative to reading. If we want to write to a

table

with computed column defined, we only need to provide the columns

except

computed columns (see SQL Server [1]). The computed column is ignored

in

the insert statement. Get back to the question, how can we write out

a

timestamp into the message header? IMO, we can provide a

configuration

to

support this, such as `kafka.writeTimestamp=true`, then the timestmap

in

StreamRecord will be write to Kafka message header. What do you

think?

  4c. separate all time attribute concerns into a special clause

next

to

the regular schema?
  Separating watermark into a special clause similar to

PARTITIONED

BY is

also a good idea. Conceptually, it's fine to put watermark in schema

part

or out schema part. But if we want to support multiple watermark
definition, maybe it would be better to put it in schema part. It is
similar to Index Definition that we can define several indexes on a

table

in schema part.

  4d. How can people come up with a custom watermark strategy?
  In most cases, the built-in strategy can works good. If we need

a

custom one, we can use a scalar function which restrict to only

return

a

nullable Long, and use it in SQL like: WATERMARK for rowtime AS
watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar

function

accepts 3 parameters and return a nullable Long which can be used as
punctuated watermark assigner. Another choice is implementing a class
extending the
`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and

use

it

in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
scalar function can cover the requirements here, I would prefer it

here,

because it keeps standard compliant. BTW, this feature is not in MVP,

we

can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the schema from
external storage or something schema file. Actually, we have already
encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive syntax

while

keeping SQL ANSI standard. This will make it more acceptable and

reduce

the

learning cost for user.

[1]:


https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017

Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu 

wrote:

Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to

something

meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on tables

that

belong to a security component. Unless the table is created

differently

based on source, sink, or both, it doesn't seem necessary to use

these

keywords to enforce permissions.
5. It might be okay if schema declaration is always needed. While

there

might be some duplication sometimes, it's not always true. For

example,

external schema may not be exactly matching Flink schema. For

instance,

data types. Even if so, perfect match is not required. For instance,

the

external schema file may evolve while table schema in Flink may stay
unchanged. A responsible reader should be able to scan the file

based

on

file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to operate

on

Hive metadata and data, it's an add-on benefit if we can be

compatible

with

Hive syntax/semantics while following ANSI standard. At least we

should

be

as close as possible. Hive DDL can found at
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu



--
Sender:Lin Li 
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we

declare

it

using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further

question,

if a

TABLE
t1 firstly declared as read only (as a source table), then for some

new

requirements
t1 will change to a sink table,  in this case we need updating both

the

DDL

and catalogs.
Further more, let's think about the BATCH query, update one table

in-place

can be a common case.
e.g.,
```
CREATE TABLE t1 (
col1 varchar,
col2 int,
col3 varchar
...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
(some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the

validation

purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an
`org.apache.flink.table.sources.tsextractors.TimestampE

Re: [DISCUSS] Flink SQL DDL Design

2018-12-07 Thread Fabian Hueske
gt; > > timestamp into the message header? IMO, we can provide a
> configuration
> > to
> > > > support this, such as `kafka.writeTimestamp=true`, then the timestmap
> > in
> > > > StreamRecord will be write to Kafka message header. What do you
> think?
> > > >
> > > >  4c. separate all time attribute concerns into a special clause
> > next
> > > to
> > > > the regular schema?
> > > >  Separating watermark into a special clause similar to
> PARTITIONED
> > > BY is
> > > > also a good idea. Conceptually, it's fine to put watermark in schema
> > part
> > > > or out schema part. But if we want to support multiple watermark
> > > > definition, maybe it would be better to put it in schema part. It is
> > > > similar to Index Definition that we can define several indexes on a
> > table
> > > > in schema part.
> > > >
> > > >  4d. How can people come up with a custom watermark strategy?
> > > >  In most cases, the built-in strategy can works good. If we need
> a
> > > > custom one, we can use a scalar function which restrict to only
> return
> > a
> > > > nullable Long, and use it in SQL like: WATERMARK for rowtime AS
> > > > watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
> > > function
> > > > accepts 3 parameters and return a nullable Long which can be used as
> > > > punctuated watermark assigner. Another choice is implementing a class
> > > > extending the
> > > > `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and
> use
> > > it
> > > > in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
> > > > scalar function can cover the requirements here, I would prefer it
> > here,
> > > > because it keeps standard compliant. BTW, this feature is not in MVP,
> > we
> > > > can discuss it more depth in the future when we need it.
> > > >
> > > > 5. Schema declaration:
> > > > I like the proposal to omit the schema if we can get the schema from
> > > > external storage or something schema file. Actually, we have already
> > > > encountered this requirement in out company.
> > > >
> > > >
> > > > +1 to @Xuefu that we should be as close as possible to Hive syntax
> > while
> > > > keeping SQL ANSI standard. This will make it more acceptable and
> reduce
> > > the
> > > > learning cost for user.
> > > >
> > > > [1]:
> > > >
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu 
> > > wrote:
> > > >
> > > >> Hi Timo/Shuyi/Lin,
> > > >>
> > > >> Thanks for the discussions. It seems that we are converging to
> > something
> > > >> meaningful. Here are some of my thoughts:
> > > >>
> > > >> 1. +1 on MVP DDL
> > > >> 3. Markers for source or sink seem more about permissions on tables
> > that
> > > >> belong to a security component. Unless the table is created
> > differently
> > > >> based on source, sink, or both, it doesn't seem necessary to use
> these
> > > >> keywords to enforce permissions.
> > > >> 5. It might be okay if schema declaration is always needed. While
> > there
> > > >> might be some duplication sometimes, it's not always true. For
> > example,
> > > >> external schema may not be exactly matching Flink schema. For
> > instance,
> > > >> data types. Even if so, perfect match is not required. For instance,
> > the
> > > >> external schema file may evolve while table schema in Flink may stay
> > > >> unchanged. A responsible reader should be able to scan the file
> based
> > on
> > > >> file schema and return the data based on table schema.
> > > >>
> > > >> Other aspects:
> > > >>
> > > >> 7. Hive compatibility. Since Flink SQL will soon be able to operate
> on
> > > >> Hive metadata and data, it's an add-on benefit if we can be
> compatible
> > > with
> > > >> Hive syntax/semantics while

Re: [DISCUSS] Flink SQL DDL Design

2018-12-07 Thread Jark Wu
ttributes:
> > >4a. timestamps are in the schema twice.
> > > If an existing field is Long/Timestamp, we can just use it as
> > rowtime,
> > > no twice defined. If it is not a Long/Timestamp, we use computed column
> > to
> > > get an expected timestamp column to be rowtime, is this what you mean
> > > defined twice?  But I don't think it is a problem, but an advantages,
> > > because it is easy to use, user do not need to consider whether to
> > "replace
> > > the existing column" or "add a new column", he will not be confused
> > what's
> > > the real schema is, what's the index of rowtime in the schema?
> Regarding
> > to
> > > the optimization, even if timestamps are in schema twice, when the
> > original
> > > timestamp is never used in query, then the projection pushdown
> > optimization
> > > can cut this field as early as possible, which is exactly the same as
> > > "replacing the existing column" in runtime.
> > >
> > > 4b. how can we write out a timestamp into the message header?
> > >  That's a good point. I think computed column is just a virtual
> > column
> > > on table which is only relative to reading. If we want to write to a
> > table
> > > with computed column defined, we only need to provide the columns
> except
> > > computed columns (see SQL Server [1]). The computed column is ignored
> in
> > > the insert statement. Get back to the question, how can we write out a
> > > timestamp into the message header? IMO, we can provide a configuration
> to
> > > support this, such as `kafka.writeTimestamp=true`, then the timestmap
> in
> > > StreamRecord will be write to Kafka message header. What do you think?
> > >
> > >  4c. separate all time attribute concerns into a special clause
> next
> > to
> > > the regular schema?
> > >  Separating watermark into a special clause similar to PARTITIONED
> > BY is
> > > also a good idea. Conceptually, it's fine to put watermark in schema
> part
> > > or out schema part. But if we want to support multiple watermark
> > > definition, maybe it would be better to put it in schema part. It is
> > > similar to Index Definition that we can define several indexes on a
> table
> > > in schema part.
> > >
> > >  4d. How can people come up with a custom watermark strategy?
> > >  In most cases, the built-in strategy can works good. If we need a
> > > custom one, we can use a scalar function which restrict to only return
> a
> > > nullable Long, and use it in SQL like: WATERMARK for rowtime AS
> > > watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
> > function
> > > accepts 3 parameters and return a nullable Long which can be used as
> > > punctuated watermark assigner. Another choice is implementing a class
> > > extending the
> > > `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and use
> > it
> > > in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
> > > scalar function can cover the requirements here, I would prefer it
> here,
> > > because it keeps standard compliant. BTW, this feature is not in MVP,
> we
> > > can discuss it more depth in the future when we need it.
> > >
> > > 5. Schema declaration:
> > > I like the proposal to omit the schema if we can get the schema from
> > > external storage or something schema file. Actually, we have already
> > > encountered this requirement in out company.
> > >
> > >
> > > +1 to @Xuefu that we should be as close as possible to Hive syntax
> while
> > > keeping SQL ANSI standard. This will make it more acceptable and reduce
> > the
> > > learning cost for user.
> > >
> > > [1]:
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu 
> > wrote:
> > >
> > >> Hi Timo/Shuyi/Lin,
> > >>
> > >> Thanks for the discussions. It seems that we are converging to
> something
> > >> meaningful. Here are some of my thoughts:
> > >>
> > >> 1. +1 on MVP DDL
> > >> 3. Markers for source or sink seem more about permissions on tables
> th

Re: [DISCUSS] Flink SQL DDL Design

2018-12-07 Thread Lin Li
would prefer it here,
> > because it keeps standard compliant. BTW, this feature is not in MVP, we
> > can discuss it more depth in the future when we need it.
> >
> > 5. Schema declaration:
> > I like the proposal to omit the schema if we can get the schema from
> > external storage or something schema file. Actually, we have already
> > encountered this requirement in out company.
> >
> >
> > +1 to @Xuefu that we should be as close as possible to Hive syntax while
> > keeping SQL ANSI standard. This will make it more acceptable and reduce
> the
> > learning cost for user.
> >
> > [1]:
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> >
> > Best,
> > Jark
> >
> > On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu 
> wrote:
> >
> >> Hi Timo/Shuyi/Lin,
> >>
> >> Thanks for the discussions. It seems that we are converging to something
> >> meaningful. Here are some of my thoughts:
> >>
> >> 1. +1 on MVP DDL
> >> 3. Markers for source or sink seem more about permissions on tables that
> >> belong to a security component. Unless the table is created differently
> >> based on source, sink, or both, it doesn't seem necessary to use these
> >> keywords to enforce permissions.
> >> 5. It might be okay if schema declaration is always needed. While there
> >> might be some duplication sometimes, it's not always true. For example,
> >> external schema may not be exactly matching Flink schema. For instance,
> >> data types. Even if so, perfect match is not required. For instance, the
> >> external schema file may evolve while table schema in Flink may stay
> >> unchanged. A responsible reader should be able to scan the file based on
> >> file schema and return the data based on table schema.
> >>
> >> Other aspects:
> >>
> >> 7. Hive compatibility. Since Flink SQL will soon be able to operate on
> >> Hive metadata and data, it's an add-on benefit if we can be compatible
> with
> >> Hive syntax/semantics while following ANSI standard. At least we should
> be
> >> as close as possible. Hive DDL can found at
> >> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
> >>
> >> Thanks,
> >> Xuefu
> >>
> >>
> >>
> >> --
> >> Sender:Lin Li 
> >> Sent at:2018 Dec 6 (Thu) 10:49
> >> Recipient:dev 
> >> Subject:Re: [DISCUSS] Flink SQL DDL Design
> >>
> >> Hi Timo and Shuyi,
> >>thanks for your feedback.
> >>
> >> 1. Scope
> >> agree with you we should focus on the MVP DDL first.
> >>
> >> 2. Constraints
> >> yes, this can be a follow-up issue.
> >>
> >> 3. Sources/Sinks
> >> If a TABLE has both read/write access requirements, should we declare it
> >> using
> >> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further question,
> if a
> >> TABLE
> >> t1 firstly declared as read only (as a source table), then for some new
> >> requirements
> >> t1 will change to a sink table,  in this case we need updating both the
> DDL
> >> and catalogs.
> >> Further more, let's think about the BATCH query, update one table
> in-place
> >> can be a common case.
> >> e.g.,
> >> ```
> >> CREATE TABLE t1 (
> >>col1 varchar,
> >>col2 int,
> >>col3 varchar
> >>...
> >> );
> >>
> >> INSERT [OVERWRITE] TABLE t1
> >> AS
> >> SELECT
> >>(some computing ...)
> >> FROM t1;
> >> ```
> >> So, let's forget these SOURCE/SINK keywords in DDL. For the validation
> >> purpose, we can find out other ways.
> >>
> >> 4. Time attributes
> >> As Shuyi mentioned before, there exists an
> >> `org.apache.flink.table.sources.tsextractors.TimestampExtractor` for
> custom
> >> defined time attributes usage, but this expression based class is more
> >> friendly for table api not the SQL.
> >> ```
> >> /**
> >>* Provides the an expression to extract the timestamp for a rowtime
> >> attribute.
> >>*/
> >> abstract class TimestampExtractor extends FieldComputer[Long] with
> >> Serializable {
> >>
> >>/** Timestamp extractors 

Re: [DISCUSS] Flink SQL DDL Design

2018-12-06 Thread Timo Walther
eed to consider whether to "replace
the existing column" or "add a new column", he will not be confused what's
the real schema is, what's the index of rowtime in the schema? Regarding to
the optimization, even if timestamps are in schema twice, when the original
timestamp is never used in query, then the projection pushdown optimization
can cut this field as early as possible, which is exactly the same as
"replacing the existing column" in runtime.

4b. how can we write out a timestamp into the message header?
 That's a good point. I think computed column is just a virtual column
on table which is only relative to reading. If we want to write to a table
with computed column defined, we only need to provide the columns except
computed columns (see SQL Server [1]). The computed column is ignored in
the insert statement. Get back to the question, how can we write out a
timestamp into the message header? IMO, we can provide a configuration to
support this, such as `kafka.writeTimestamp=true`, then the timestmap in
StreamRecord will be write to Kafka message header. What do you think?

 4c. separate all time attribute concerns into a special clause next to
the regular schema?
 Separating watermark into a special clause similar to PARTITIONED BY is
also a good idea. Conceptually, it's fine to put watermark in schema part
or out schema part. But if we want to support multiple watermark
definition, maybe it would be better to put it in schema part. It is
similar to Index Definition that we can define several indexes on a table
in schema part.

 4d. How can people come up with a custom watermark strategy?
 In most cases, the built-in strategy can works good. If we need a
custom one, we can use a scalar function which restrict to only return a
nullable Long, and use it in SQL like: WATERMARK for rowtime AS
watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar function
accepts 3 parameters and return a nullable Long which can be used as
punctuated watermark assigner. Another choice is implementing a class
extending the
`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and use it
in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
scalar function can cover the requirements here, I would prefer it here,
because it keeps standard compliant. BTW, this feature is not in MVP, we
can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the schema from
external storage or something schema file. Actually, we have already
encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive syntax while
keeping SQL ANSI standard. This will make it more acceptable and reduce the
learning cost for user.

[1]:
https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017

Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu  wrote:


Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to something
meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on tables that
belong to a security component. Unless the table is created differently
based on source, sink, or both, it doesn't seem necessary to use these
keywords to enforce permissions.
5. It might be okay if schema declaration is always needed. While there
might be some duplication sometimes, it's not always true. For example,
external schema may not be exactly matching Flink schema. For instance,
data types. Even if so, perfect match is not required. For instance, the
external schema file may evolve while table schema in Flink may stay
unchanged. A responsible reader should be able to scan the file based on
file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to operate on
Hive metadata and data, it's an add-on benefit if we can be compatible with
Hive syntax/semantics while following ANSI standard. At least we should be
as close as possible. Hive DDL can found at
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu



--
Sender:Lin Li 
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
   thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we declare it
using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further question, if a
TABLE
t1 firstly declared as read only (as a source table), then for some new
requirements
t1 will change to a sink table,  in this case we need updating b

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Jark Wu
efit if we can be compatible with
> Hive syntax/semantics while following ANSI standard. At least we should be
> as close as possible. Hive DDL can found at
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
>
> Thanks,
> Xuefu
>
>
>
> --
> Sender:Lin Li 
> Sent at:2018 Dec 6 (Thu) 10:49
> Recipient:dev 
> Subject:Re: [DISCUSS] Flink SQL DDL Design
>
> Hi Timo and Shuyi,
>   thanks for your feedback.
>
> 1. Scope
> agree with you we should focus on the MVP DDL first.
>
> 2. Constraints
> yes, this can be a follow-up issue.
>
> 3. Sources/Sinks
> If a TABLE has both read/write access requirements, should we declare it
> using
> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further question, if a
> TABLE
> t1 firstly declared as read only (as a source table), then for some new
> requirements
> t1 will change to a sink table,  in this case we need updating both the DDL
> and catalogs.
> Further more, let's think about the BATCH query, update one table in-place
> can be a common case.
> e.g.,
> ```
> CREATE TABLE t1 (
>   col1 varchar,
>   col2 int,
>   col3 varchar
>   ...
> );
>
> INSERT [OVERWRITE] TABLE t1
> AS
> SELECT
>   (some computing ...)
> FROM t1;
> ```
> So, let's forget these SOURCE/SINK keywords in DDL. For the validation
> purpose, we can find out other ways.
>
> 4. Time attributes
> As Shuyi mentioned before, there exists an
> `org.apache.flink.table.sources.tsextractors.TimestampExtractor` for custom
> defined time attributes usage, but this expression based class is more
> friendly for table api not the SQL.
> ```
> /**
>   * Provides the an expression to extract the timestamp for a rowtime
> attribute.
>   */
> abstract class TimestampExtractor extends FieldComputer[Long] with
> Serializable {
>
>   /** Timestamp extractors compute the timestamp as Long. */
>   override def getReturnType: TypeInformation[Long] =
> Types.LONG.asInstanceOf[TypeInformation[Long]]
> }
> ```
> BTW, I think both the Scalar function and the TimestampExtractor are
> expressing computing logic, the TimestampExtractor has no more advantage in
> SQL scenarios.
>
>
> 6. Partitioning and keys
> Primary Key is included in Constraint part, and partitioned table support
> can be another topic later.
>
> 5. Schema declaration
> Agree with you that we can do better schema derivation for user
> convenience, but this is not conflict with the syntax.
> Table properties can carry any useful informations both for the users and
> the framework, I like your `contract name` proposal,
> e.g., `WITH (format.type = avro)`, the framework can recognize some
> `contract name` like `format.type`, `connector.type` and etc.
> And also derive the table schema from an existing schema file can be handy
> especially one with too many table columns.
>
> Regards,
> Lin
>
>
> Timo Walther  于2018年12月5日周三 下午10:40写道:
>
> > Hi Jark and Shuyi,
> >
> > thanks for pushing the DDL efforts forward. I agree that we should aim
> > to combine both Shuyi's design and your design.
> >
> > Here are a couple of concerns that I think we should address in the
> design:
> >
> > 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements first.
> > I think this topic has already enough potential for long discussions and
> > is very helpful for users. We can discuss CREATE VIEW and CREATE
> > FUNCTION afterwards as they are not related to each other.
> >
> > 2. Constraints: I think we should consider things like nullability,
> > VARCHAR length, and decimal scale and precision in the future as they
> > allow for nice optimizations. However, since both the translation and
> > runtime operators do not support those features. I would not introduce a
> > arbitrary default value but omit those parameters for now. This can be a
> > follow-up issue once the basic DDL has been merged.
> >
> > 3. Sources/Sinks: We had a discussion about CREATE TABLE vs CREATE
> > [SOURCE|SINK|] TABLE before. In my opinion we should allow for these
> > explicit declaration because in most production scenarios, teams have
> > strict read/write access requirements. For example, a data science team
> > should only consume from a event Kafka topic but should not accidently
> > write back to the single source of truth.
> >
> > 4. Time attributes: In general, I like your computed columns approach
> > because it makes defining a rowtime attributes transparent and simple.
> > However, there are downsides that we should discuss.
> > 4a. J

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Zhang, Xuefu
Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to something 
meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on tables that belong 
to a security component. Unless the table is created differently based on 
source, sink, or both, it doesn't seem necessary to use these keywords to 
enforce permissions.
5. It might be okay if schema declaration is always needed. While there might 
be some duplication sometimes, it's not always true. For example, external 
schema may not be exactly matching Flink schema. For instance, data types. Even 
if so, perfect match is not required. For instance, the external schema file 
may evolve while table schema in Flink may stay unchanged. A responsible reader 
should be able to scan the file based on file schema and return the data based 
on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to operate on Hive 
metadata and data, it's an add-on benefit if we can be compatible with Hive 
syntax/semantics while following ANSI standard. At least we should be as close 
as possible. Hive DDL can found at 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu



--
Sender:Lin Li 
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
  thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we declare it
using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further question, if a
TABLE
t1 firstly declared as read only (as a source table), then for some new
requirements
t1 will change to a sink table,  in this case we need updating both the DDL
and catalogs.
Further more, let's think about the BATCH query, update one table in-place
can be a common case.
e.g.,
```
CREATE TABLE t1 (
  col1 varchar,
  col2 int,
  col3 varchar
  ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
  (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the validation
purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an
`org.apache.flink.table.sources.tsextractors.TimestampExtractor` for custom
defined time attributes usage, but this expression based class is more
friendly for table api not the SQL.
```
/**
  * Provides the an expression to extract the timestamp for a rowtime
attribute.
  */
abstract class TimestampExtractor extends FieldComputer[Long] with
Serializable {

  /** Timestamp extractors compute the timestamp as Long. */
  override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the TimestampExtractor are
expressing computing logic, the TimestampExtractor has no more advantage in
SQL scenarios.


6. Partitioning and keys
Primary Key is included in Constraint part, and partitioned table support
can be another topic later.

5. Schema declaration
Agree with you that we can do better schema derivation for user
convenience, but this is not conflict with the syntax.
Table properties can carry any useful informations both for the users and
the framework, I like your `contract name` proposal,
e.g., `WITH (format.type = avro)`, the framework can recognize some
`contract name` like `format.type`, `connector.type` and etc.
And also derive the table schema from an existing schema file can be handy
especially one with too many table columns.

Regards,
Lin


Timo Walther  于2018年12月5日周三 下午10:40写道:

> Hi Jark and Shuyi,
>
> thanks for pushing the DDL efforts forward. I agree that we should aim
> to combine both Shuyi's design and your design.
>
> Here are a couple of concerns that I think we should address in the design:
>
> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements first.
> I think this topic has already enough potential for long discussions and
> is very helpful for users. We can discuss CREATE VIEW and CREATE
> FUNCTION afterwards as they are not related to each other.
>
> 2. Constraints: I think we should consider things like nullability,
> VARCHAR length, and decimal scale and precision in the future as they
> allow for nice optimizations. However, since both the translation and
> runtime operators do not support those features. I would not introduce a
> arbitrary default value but omit those parameters for now. This can be a
> follow-up issue once the basic DDL has been merged.
>
> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs CREATE
> [SOURCE|SINK|] TABLE before. In my opinion we should allow for these
> explicit declaration because in most production scenario

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Lin Li
;>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu 
> >>>> wrote:
> >>>>>>>>>> Hi Shuyi,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for bringing up this discussion and the awesome
> >> work!
> >>> I
> >>>>> have
> >>>>>>>> left
> >>>>>>>>>> some comments in the doc.
> >>>>>>>>>>
> >>>>>>>>>> I want to share something more about the watermark
> >> definition
> >>>>>> learned
> >>>>>>>>> from
> >>>>>>>>>> Alibaba.
> >>>>>>>>>>
> >>>>>>>>>> 1.
> >>>>>>>>>>
> >>>>>>>>>> Table should be able to accept multiple watermark
> >>>> definition.
> >>>>>>>>>> Because a table may have more than one rowtime field.
> >> For
> >>>>>> example,
> >>>>>>>> one
> >>>>>>>>>> rowtime field is from existing field but missing in some
> >>>>>> records,
> >>>>>>>>>> another
> >>>>>>>>>> is the ingestion timestamp in Kafka but not very
> >> accurate.
> >>>> In
> >>>>>> this
> >>>>>>>>> case,
> >>>>>>>>>> user may define two rowtime fields with watermarks in
> >> the
> >>>>> Table
> >>>>>>> and
> >>>>>>>>>> choose
> >>>>>>>>>> one in different situation.
> >>>>>>>>>> 2.
> >>>>>>>>>>
> >>>>>>>>>> Watermark stragety always work with rowtime field
> >>> together.
> >>>>>>>>>> Based on the two points metioned above, I think we should
> >>>> combine
> >>>>>> the
> >>>>>>>>>> watermark strategy and rowtime field selection (i.e. which
> >>>>> existing
> >>>>>>>> field
> >>>>>>>>>> used to generate watermark) in one clause, so that we can
> >>>> define
> >>>>>>>> multiple
> >>>>>>>>>> watermarks in one Table.
> >>>>>>>>>>
> >>>>>>>>>> Here I will share the watermark syntax used in Alibaba
> >>> (simply
> >>>>>>>> modified):
> >>>>>>>>>> watermarkDefinition:
> >>>>>>>>>> WATERMARK [watermarkName] FOR  AS
> >> wm_strategy
> >>>>>>>>>> wm_strategy:
> >>>>>>>>>>BOUNDED WITH OFFSET 'string' timeUnit
> >>>>>>>>>> |
> >>>>>>>>>>ASCENDING
> >>>>>>>>>>
> >>>>>>>>>> The “WATERMARK” keyword starts a watermark definition. The
> >>>> “FOR”
> >>>>>>>> keyword
> >>>>>>>>>> defines which existing field used to generate watermark,
> >> this
> >>>>> field
> >>>>>>>>> should
> >>>>>>>>>> already exist in the schema (we can use computed-column to
> >>>> derive
> >>>>>>> from
> >>>>>>>>>> other fields). The “AS” keyword defines watermark strategy,
> >>>> such
> >>>>> as
> >>>>>>>>> BOUNDED
> >>>>>>>>>> WITH OFFSET (covers almost all the requirements) and
> >>> ASCENDING.
> >>>>>>>>>> When the expected rowtime field does not exist in the
> >> schema,
> >>>> we
> >>>>>> can
> >>>>>>>> use
> >>>>>>>>>> computed-column syntax to derive it from other existing
> >>> fields
> >>>>>> using
> >>>>>>>>>> built-in functions or user defined functions. So the
>

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Timo Walther
:=
 columnName

tableOption ::=
 property=value
 offset ::=
 positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
   [
 ( columnName [, columnName]* )
   ]
 AS queryStatement;

CREATE FUNCTION

  CREATE FUNCTION functionName
   AS 'className';

  className ::=
 fully qualified name


Shuyi Chen  于2018年11月28日周三 上午3:28写道:


Thanks a lot, Timo and Xuefu. Yes, I think we can

finalize

the

design

doc

first and start implementation w/o the unified

connector

API

ready

by

skipping some featue.

Xuefu, I like the idea of making Flink specific

properties

into

generic

key-value pairs, so that it will make integration with

Hive

DDL

(or

others,

e.g. Beam DDL) easier.

I'll run a final pass over the design doc and finalize

the

design

in

the

next few days. And we can start creating tasks and

collaborate

on

the

implementation. Thanks a lot for all the comments and

inputs.

Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <

xuef...@alibaba-inc.com>

wrote:


Yeah! I agree with Timo that DDL can actually proceed

w/o

being

blocked

by

connector API. We can leave the unknown out while

defining

the

basic

syntax.

@Shuyi

As commented in the doc, I think we can probably

stick

with

simple

syntax

with general properties, without extending the syntax

too

much

that

it

mimics the descriptor API.

Part of our effort on Flink-Hive integration is also

to

make

DDL

syntax

compatible with Hive's. The one in the current

proposal

seems

making

our

effort more challenging.

We can help and collaborate. At this moment, I think

we

can

finalize

on

the proposal and then we can divide the tasks for

better

collaboration.

Please let me know if there are  any questions or

suggestions.

Thanks,
Xuefu






------------------

Sender:Timo Walther 
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It would

be

great

to

move

these efforts forward. I agree that the DDL is

somehow

releated

to

the

unified connector API design but we can also start

with

the

basic

functionality now and evolve the DDL during this

release

and

next

releases.

For example, we could identify the MVP DDL syntax

that

skips

defining

key constraints and maybe even time attributes. This

DDL

could

be

used

for batch usecases, ETL, and materializing SQL

queries

(no

time

operations like windows).

The unified connector API is high on our priority

list

for

the

1.8

release. I will try to update the document until mid

of

next

week.


Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:

Thanks a lot, Xuefu. I was busy for some other

stuff

for

the

last 2

weeks,

but we are definitely interested in moving this

forward.

I

think

once

the

unified connector API design [1] is done, we can

finalize

the

DDL

design

as

well and start creating concrete subtasks to

collaborate

on

the

implementation with the community.

Shuyi

[1]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing

On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <

xuef...@alibaba-inc.com>

wrote:


Hi Shuyi,

I'm wondering if you folks still have the

bandwidth

working

on

this.

We have some dedicated resource and like to move

this

forward.

We

can

collaborate.

Thanks,

Xuefu




--

发件人:wenlong.lwl
日 期:2018年11月05日 11:15:35
收件人:
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the

ddl,

because

it

is

not

necessary, the framework determine the table

referred

is a

source

or a

sink

according to the context of the query using the

table.

it

will

be

more

convenient for use defining a table which can be

both

a

source

and

sink,

and more convenient for catalog to persistent and

manage

the

meta

infos.

2. how about just keeping one pure string map as

parameters

for

table,

like

create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode =

‘latest-offset’,

connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map

properties,

defining

parameters by string-map can be the closest way to

mapping

how

user

use

the

parameters.
2. The table descriptor can be extended by user,

like

what

is

done

in

Kafka

and Json, it means 

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Jark Wu
I will share the watermark syntax used in Alibaba
> > (simply
> > > > > > > modified):
> > > > > > > > >
> > > > > > > > > watermarkDefinition:
> > > > > > > > > WATERMARK [watermarkName] FOR  AS
> wm_strategy
> > > > > > > > >
> > > > > > > > > wm_strategy:
> > > > > > > > >   BOUNDED WITH OFFSET 'string' timeUnit
> > > > > > > > > |
> > > > > > > > >   ASCENDING
> > > > > > > > >
> > > > > > > > > The “WATERMARK” keyword starts a watermark definition. The
> > > “FOR”
> > > > > > > keyword
> > > > > > > > > defines which existing field used to generate watermark,
> this
> > > > field
> > > > > > > > should
> > > > > > > > > already exist in the schema (we can use computed-column to
> > > derive
> > > > > > from
> > > > > > > > > other fields). The “AS” keyword defines watermark strategy,
> > > such
> > > > as
> > > > > > > > BOUNDED
> > > > > > > > > WITH OFFSET (covers almost all the requirements) and
> > ASCENDING.
> > > > > > > > >
> > > > > > > > > When the expected rowtime field does not exist in the
> schema,
> > > we
> > > > > can
> > > > > > > use
> > > > > > > > > computed-column syntax to derive it from other existing
> > fields
> > > > > using
> > > > > > > > > built-in functions or user defined functions. So the
> > > > > > rowtime/watermark
> > > > > > > > > definition doesn’t need to care about “field-change”
> strategy
> > > > > > > > > (replace/add/from-field). And the proctime field definition
> > can
> > > > > also
> > > > > > be
> > > > > > > > > defined using computed-column. Such as pt as PROCTIME()
> which
> > > > > > defines a
> > > > > > > > > proctime field named “pt” in the schema.
> > > > > > > > >
> > > > > > > > > Looking forward to working with you guys!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jark Wu
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > > > > > > > >
> > > > > > > > > > @Shuyi
> > > > > > > > > > Thanks for the proposal!  We have a simple DDL
> > implementation
> > > > > > > (extends
> > > > > > > > > > Calcite's parser) which been running for almost two years
> > on
> > > > > > > production
> > > > > > > > > and
> > > > > > > > > > works well.
> > > > > > > > > > I think the most valued things we'd learned is keeping
> > > > simplicity
> > > > > > and
> > > > > > > > > > standard compliance.
> > > > > > > > > > Here's the approximate grammar, FYI
> > > > > > > > > > CREATE TABLE
> > > > > > > > > >
> > > > > > > > > > CREATE TABLE tableName(
> > > > > > > > > > columnDefinition [, columnDefinition]*
> > > > > > > > > > [ computedColumnDefinition [,
> > > > computedColumnDefinition]*
> > > > > ]
> > > > > > > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > > > > > > [ tableIndex [, tableIndex]* ]
> > > > > > > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > > > > > >     [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > > > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH (
> > > tableOption
> > > > [
> > > > > ,
> > > > > > > > > > tableOption]* ) ] [ ; ]
> > > > > > > > > >
> > > > > > > > > > columnD

Re: [DISCUSS] Flink SQL DDL Design

2018-12-05 Thread Shuyi Chen
 > and
> > > > > > > > > works well.
> > > > > > > > > I think the most valued things we'd learned is keeping
> > > simplicity
> > > > > and
> > > > > > > > > standard compliance.
> > > > > > > > > Here's the approximate grammar, FYI
> > > > > > > > > CREATE TABLE
> > > > > > > > >
> > > > > > > > > CREATE TABLE tableName(
> > > > > > > > > columnDefinition [, columnDefinition]*
> > > > > > > > > [ computedColumnDefinition [,
> > > computedColumnDefinition]*
> > > > ]
> > > > > > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > > > > > [ tableIndex [, tableIndex]* ]
> > > > > > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > > > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH (
> > tableOption
> > > [
> > > > ,
> > > > > > > > > tableOption]* ) ] [ ; ]
> > > > > > > > >
> > > > > > > > > columnDefinition ::=
> > > > > > > > > columnName dataType [ NOT NULL ]
> > > > > > > > >
> > > > > > > > > dataType  ::=
> > > > > > > > > {
> > > > > > > > >   [ VARCHAR ]
> > > > > > > > >   | [ BOOLEAN ]
> > > > > > > > >   | [ TINYINT ]
> > > > > > > > >   | [ SMALLINT ]
> > > > > > > > >   | [ INT ]
> > > > > > > > >   | [ BIGINT ]
> > > > > > > > >   | [ FLOAT ]
> > > > > > > > >   | [ DECIMAL ]
> > > > > > > > >   | [ DOUBLE ]
> > > > > > > > >   | [ DATE ]
> > > > > > > > >   | [ TIME ]
> > > > > > > > >   | [ TIMESTAMP ]
> > > > > > > > >   | [ VARBINARY ]
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > computedColumnDefinition ::=
> > > > > > > > > columnName AS computedColumnExpression
> > > > > > > > >
> > > > > > > > > tableConstraint ::=
> > > > > > > > > { PRIMARY KEY | UNIQUE }
> > > > > > > > > (columnName [, columnName]* )
> > > > > > > > >
> > > > > > > > > tableIndex ::=
> > > > > > > > > [ UNIQUE ] INDEX indexName
> > > > > > > > >  (columnName [, columnName]* )
> > > > > > > > >
> > > > > > > > > rowTimeColumn ::=
> > > > > > > > > columnName
> > > > > > > > >
> > > > > > > > > tableOption ::=
> > > > > > > > > property=value
> > > > > > > > > offset ::=
> > > > > > > > > positive integer (unit: ms)
> > > > > > > > >
> > > > > > > > > CREATE VIEW
> > > > > > > > >
> > > > > > > > > CREATE VIEW viewName
> > > > > > > > >   [
> > > > > > > > > ( columnName [, columnName]* )
> > > > > > > > >   ]
> > > > > > > > > AS queryStatement;
> > > > > > > > >
> > > > > > > > > CREATE FUNCTION
> > > > > > > > >
> > > > > > > > >  CREATE FUNCTION functionName
> > > > > > > > >   AS 'className';
> > > > > > > > >
> > > > > > > > >  className ::=
> > > > > > > > > fully qualified name
> > > > > > > > >
> > > > > > > > >
> > > > > &

Re: [DISCUSS] Flink SQL DDL Design

2018-12-04 Thread Jark Wu
timeUnit
> > > > > > > |
> > > > > > >   ASCENDING
> > > > > > >
> > > > > > > The “WATERMARK” keyword starts a watermark definition. The
> “FOR”
> > > > > keyword
> > > > > > > defines which existing field used to generate watermark, this
> > field
> > > > > > should
> > > > > > > already exist in the schema (we can use computed-column to
> derive
> > > > from
> > > > > > > other fields). The “AS” keyword defines watermark strategy,
> such
> > as
> > > > > > BOUNDED
> > > > > > > WITH OFFSET (covers almost all the requirements) and ASCENDING.
> > > > > > >
> > > > > > > When the expected rowtime field does not exist in the schema,
> we
> > > can
> > > > > use
> > > > > > > computed-column syntax to derive it from other existing fields
> > > using
> > > > > > > built-in functions or user defined functions. So the
> > > > rowtime/watermark
> > > > > > > definition doesn’t need to care about “field-change” strategy
> > > > > > > (replace/add/from-field). And the proctime field definition can
> > > also
> > > > be
> > > > > > > defined using computed-column. Such as pt as PROCTIME() which
> > > > defines a
> > > > > > > proctime field named “pt” in the schema.
> > > > > > >
> > > > > > > Looking forward to working with you guys!
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark Wu
> > > > > > >
> > > > > > >
> > > > > > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > > > > > >
> > > > > > > > @Shuyi
> > > > > > > > Thanks for the proposal!  We have a simple DDL implementation
> > > > > (extends
> > > > > > > > Calcite's parser) which been running for almost two years on
> > > > > production
> > > > > > > and
> > > > > > > > works well.
> > > > > > > > I think the most valued things we'd learned is keeping
> > simplicity
> > > > and
> > > > > > > > standard compliance.
> > > > > > > > Here's the approximate grammar, FYI
> > > > > > > > CREATE TABLE
> > > > > > > >
> > > > > > > > CREATE TABLE tableName(
> > > > > > > > columnDefinition [, columnDefinition]*
> > > > > > > > [ computedColumnDefinition [,
> > computedColumnDefinition]*
> > > ]
> > > > > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > > > > [ tableIndex [, tableIndex]* ]
> > > > > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH (
> tableOption
> > [
> > > ,
> > > > > > > > tableOption]* ) ] [ ; ]
> > > > > > > >
> > > > > > > > columnDefinition ::=
> > > > > > > > columnName dataType [ NOT NULL ]
> > > > > > > >
> > > > > > > > dataType  ::=
> > > > > > > > {
> > > > > > > >   [ VARCHAR ]
> > > > > > > >   | [ BOOLEAN ]
> > > > > > > >   | [ TINYINT ]
> > > > > > > >   | [ SMALLINT ]
> > > > > > > >   | [ INT ]
> > > > > > > >   | [ BIGINT ]
> > > > > > > >   | [ FLOAT ]
> > > > > > > >   | [ DECIMAL ]
> > > > > > > >   | [ DOUBLE ]
> > > > > > > >   | [ DATE ]
> > > > > > > >   | [ TIME ]
> > > > > > > >   | [ TIMESTAMP ]
> > > > > > > >   | [ VARBINARY ]
> > > > > > > > }
> > > > > > > >
> > > > > > > > computedColumnDefinition ::=
> &

Re: [DISCUSS] Flink SQL DDL Design

2018-12-04 Thread Shaoxuan Wang
 > > > > proctime field named “pt” in the schema.
> > > > > >
> > > > > > Looking forward to working with you guys!
> > > > > >
> > > > > > Best,
> > > > > > Jark Wu
> > > > > >
> > > > > >
> > > > > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > > > > >
> > > > > > > @Shuyi
> > > > > > > Thanks for the proposal!  We have a simple DDL implementation
> > > > (extends
> > > > > > > Calcite's parser) which been running for almost two years on
> > > > production
> > > > > > and
> > > > > > > works well.
> > > > > > > I think the most valued things we'd learned is keeping
> simplicity
> > > and
> > > > > > > standard compliance.
> > > > > > > Here's the approximate grammar, FYI
> > > > > > > CREATE TABLE
> > > > > > >
> > > > > > > CREATE TABLE tableName(
> > > > > > > columnDefinition [, columnDefinition]*
> > > > > > > [ computedColumnDefinition [,
> computedColumnDefinition]*
> > ]
> > > > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > > > [ tableIndex [, tableIndex]* ]
> > > > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption
> [
> > ,
> > > > > > > tableOption]* ) ] [ ; ]
> > > > > > >
> > > > > > > columnDefinition ::=
> > > > > > > columnName dataType [ NOT NULL ]
> > > > > > >
> > > > > > > dataType  ::=
> > > > > > > {
> > > > > > >   [ VARCHAR ]
> > > > > > >   | [ BOOLEAN ]
> > > > > > >   | [ TINYINT ]
> > > > > > >   | [ SMALLINT ]
> > > > > > >   | [ INT ]
> > > > > > >   | [ BIGINT ]
> > > > > > >   | [ FLOAT ]
> > > > > > >   | [ DECIMAL ]
> > > > > > >   | [ DOUBLE ]
> > > > > > >   | [ DATE ]
> > > > > > >   | [ TIME ]
> > > > > > >   | [ TIMESTAMP ]
> > > > > > >   | [ VARBINARY ]
> > > > > > > }
> > > > > > >
> > > > > > > computedColumnDefinition ::=
> > > > > > > columnName AS computedColumnExpression
> > > > > > >
> > > > > > > tableConstraint ::=
> > > > > > > { PRIMARY KEY | UNIQUE }
> > > > > > > (columnName [, columnName]* )
> > > > > > >
> > > > > > > tableIndex ::=
> > > > > > > [ UNIQUE ] INDEX indexName
> > > > > > >  (columnName [, columnName]* )
> > > > > > >
> > > > > > > rowTimeColumn ::=
> > > > > > > columnName
> > > > > > >
> > > > > > > tableOption ::=
> > > > > > > property=value
> > > > > > > offset ::=
> > > > > > > positive integer (unit: ms)
> > > > > > >
> > > > > > > CREATE VIEW
> > > > > > >
> > > > > > > CREATE VIEW viewName
> > > > > > >   [
> > > > > > > ( columnName [, columnName]* )
> > > > > > >   ]
> > > > > > > AS queryStatement;
> > > > > > >
> > > > > > > CREATE FUNCTION
> > > > > > >
> > > > > > >  CREATE FUNCTION functionName
> > > > > > >   AS 'className';
> > > > > > >
> > > > > > >  className ::=
> > > > > > > fully qualified name
> > > > > > >
> > > > > > >
> > > > > > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > > > > > 

Re: [DISCUSS] Flink SQL DDL Design

2018-12-04 Thread Jark Wu
; > withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption [
> ,
> > > > > > tableOption]* ) ] [ ; ]
> > > > > >
> > > > > > columnDefinition ::=
> > > > > > columnName dataType [ NOT NULL ]
> > > > > >
> > > > > > dataType  ::=
> > > > > > {
> > > > > >   [ VARCHAR ]
> > > > > >   | [ BOOLEAN ]
> > > > > >   | [ TINYINT ]
> > > > > >   | [ SMALLINT ]
> > > > > >   | [ INT ]
> > > > > >   | [ BIGINT ]
> > > > > >   | [ FLOAT ]
> > > > > >   | [ DECIMAL ]
> > > > > >   | [ DOUBLE ]
> > > > > >   | [ DATE ]
> > > > > >   | [ TIME ]
> > > > > >   | [ TIMESTAMP ]
> > > > > >   | [ VARBINARY ]
> > > > > > }
> > > > > >
> > > > > > computedColumnDefinition ::=
> > > > > > columnName AS computedColumnExpression
> > > > > >
> > > > > > tableConstraint ::=
> > > > > > { PRIMARY KEY | UNIQUE }
> > > > > > (columnName [, columnName]* )
> > > > > >
> > > > > > tableIndex ::=
> > > > > > [ UNIQUE ] INDEX indexName
> > > > > >  (columnName [, columnName]* )
> > > > > >
> > > > > > rowTimeColumn ::=
> > > > > > columnName
> > > > > >
> > > > > > tableOption ::=
> > > > > > property=value
> > > > > > offset ::=
> > > > > > positive integer (unit: ms)
> > > > > >
> > > > > > CREATE VIEW
> > > > > >
> > > > > > CREATE VIEW viewName
> > > > > >   [
> > > > > > ( columnName [, columnName]* )
> > > > > >   ]
> > > > > > AS queryStatement;
> > > > > >
> > > > > > CREATE FUNCTION
> > > > > >
> > > > > >  CREATE FUNCTION functionName
> > > > > >   AS 'className';
> > > > > >
> > > > > >  className ::=
> > > > > > fully qualified name
> > > > > >
> > > > > >
> > > > > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > > > > >
> > > > > > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the
> > > design
> > > > > doc
> > > > > > > first and start implementation w/o the unified connector API
> > ready
> > > by
> > > > > > > skipping some featue.
> > > > > > >
> > > > > > > Xuefu, I like the idea of making Flink specific properties into
> > > > generic
> > > > > > > key-value pairs, so that it will make integration with Hive DDL
> > (or
> > > > > > others,
> > > > > > > e.g. Beam DDL) easier.
> > > > > > >
> > > > > > > I'll run a final pass over the design doc and finalize the
> design
> > > in
> > > > > the
> > > > > > > next few days. And we can start creating tasks and collaborate
> on
> > > the
> > > > > > > implementation. Thanks a lot for all the comments and inputs.
> > > > > > >
> > > > > > > Cheers!
> > > > > > > Shuyi
> > > > > > >
> > > > > > > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
> > > > xuef...@alibaba-inc.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Yeah! I agree with Timo that DDL can actually proceed w/o
> being
> > > > > blocked
> > > > > > > by
> > > > > > > > connector API. We can leave the unknown out while defining
> the
> > > > basic
> > > > > > > syntax.
> > > > > > > >
> > > > > > > > @Shuyi
> > > > > > > >
> > > > > > >

Re: [DISCUSS] Flink SQL DDL Design

2018-11-29 Thread Shaoxuan Wang
 > > > built-in functions or user defined functions. So the
> rowtime/watermark
> > > > definition doesn’t need to care about “field-change” strategy
> > > > (replace/add/from-field). And the proctime field definition can also
> be
> > > > defined using computed-column. Such as pt as PROCTIME() which
> defines a
> > > > proctime field named “pt” in the schema.
> > > >
> > > > Looking forward to working with you guys!
> > > >
> > > > Best,
> > > > Jark Wu
> > > >
> > > >
> > > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > > >
> > > > > @Shuyi
> > > > > Thanks for the proposal!  We have a simple DDL implementation
> > (extends
> > > > > Calcite's parser) which been running for almost two years on
> > production
> > > > and
> > > > > works well.
> > > > > I think the most valued things we'd learned is keeping simplicity
> and
> > > > > standard compliance.
> > > > > Here's the approximate grammar, FYI
> > > > > CREATE TABLE
> > > > >
> > > > > CREATE TABLE tableName(
> > > > > columnDefinition [, columnDefinition]*
> > > > > [ computedColumnDefinition [, computedColumnDefinition]* ]
> > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > [ tableIndex [, tableIndex]* ]
> > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption [ ,
> > > > > tableOption]* ) ] [ ; ]
> > > > >
> > > > > columnDefinition ::=
> > > > > columnName dataType [ NOT NULL ]
> > > > >
> > > > > dataType  ::=
> > > > > {
> > > > >   [ VARCHAR ]
> > > > >   | [ BOOLEAN ]
> > > > >   | [ TINYINT ]
> > > > >   | [ SMALLINT ]
> > > > >   | [ INT ]
> > > > >   | [ BIGINT ]
> > > > >   | [ FLOAT ]
> > > > >   | [ DECIMAL ]
> > > > >   | [ DOUBLE ]
> > > > >   | [ DATE ]
> > > > >   | [ TIME ]
> > > > >   | [ TIMESTAMP ]
> > > > >   | [ VARBINARY ]
> > > > > }
> > > > >
> > > > > computedColumnDefinition ::=
> > > > > columnName AS computedColumnExpression
> > > > >
> > > > > tableConstraint ::=
> > > > > { PRIMARY KEY | UNIQUE }
> > > > > (columnName [, columnName]* )
> > > > >
> > > > > tableIndex ::=
> > > > > [ UNIQUE ] INDEX indexName
> > > > >  (columnName [, columnName]* )
> > > > >
> > > > > rowTimeColumn ::=
> > > > > columnName
> > > > >
> > > > > tableOption ::=
> > > > > property=value
> > > > > offset ::=
> > > > > positive integer (unit: ms)
> > > > >
> > > > > CREATE VIEW
> > > > >
> > > > > CREATE VIEW viewName
> > > > >   [
> > > > > ( columnName [, columnName]* )
> > > > >   ]
> > > > > AS queryStatement;
> > > > >
> > > > > CREATE FUNCTION
> > > > >
> > > > >  CREATE FUNCTION functionName
> > > > >   AS 'className';
> > > > >
> > > > >  className ::=
> > > > > fully qualified name
> > > > >
> > > > >
> > > > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > > > >
> > > > > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the
> > design
> > > > doc
> > > > > > first and start implementation w/o the unified connector API
> ready
> > by
> > > > > > skipping some featue.
> > > > > >
> > > > > > Xuefu, I like the idea of making Flink specific properties into
> > > generic
> > > > > > key-value pairs, so that it will make integration with Hive DDL
> (or
> > > 

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Lin Li
lso
> be
> > > > defined using computed-column. Such as pt as PROCTIME() which
> defines a
> > > > proctime field named “pt” in the schema.
> > > >
> > > > Looking forward to working with you guys!
> > > >
> > > > Best,
> > > > Jark Wu
> > > >
> > > >
> > > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > > >
> > > > > @Shuyi
> > > > > Thanks for the proposal!  We have a simple DDL implementation
> > (extends
> > > > > Calcite's parser) which been running for almost two years on
> > production
> > > > and
> > > > > works well.
> > > > > I think the most valued things we'd learned is keeping simplicity
> and
> > > > > standard compliance.
> > > > > Here's the approximate grammar, FYI
> > > > > CREATE TABLE
> > > > >
> > > > > CREATE TABLE tableName(
> > > > > columnDefinition [, columnDefinition]*
> > > > > [ computedColumnDefinition [, computedColumnDefinition]* ]
> > > > > [ tableConstraint [, tableConstraint]* ]
> > > > > [ tableIndex [, tableIndex]* ]
> > > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > > withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption [ ,
> > > > > tableOption]* ) ] [ ; ]
> > > > >
> > > > > columnDefinition ::=
> > > > > columnName dataType [ NOT NULL ]
> > > > >
> > > > > dataType  ::=
> > > > > {
> > > > >   [ VARCHAR ]
> > > > >   | [ BOOLEAN ]
> > > > >   | [ TINYINT ]
> > > > >   | [ SMALLINT ]
> > > > >   | [ INT ]
> > > > >   | [ BIGINT ]
> > > > >   | [ FLOAT ]
> > > > >   | [ DECIMAL ]
> > > > >   | [ DOUBLE ]
> > > > >   | [ DATE ]
> > > > >   | [ TIME ]
> > > > >   | [ TIMESTAMP ]
> > > > >   | [ VARBINARY ]
> > > > > }
> > > > >
> > > > > computedColumnDefinition ::=
> > > > > columnName AS computedColumnExpression
> > > > >
> > > > > tableConstraint ::=
> > > > > { PRIMARY KEY | UNIQUE }
> > > > > (columnName [, columnName]* )
> > > > >
> > > > > tableIndex ::=
> > > > > [ UNIQUE ] INDEX indexName
> > > > >  (columnName [, columnName]* )
> > > > >
> > > > > rowTimeColumn ::=
> > > > > columnName
> > > > >
> > > > > tableOption ::=
> > > > > property=value
> > > > > offset ::=
> > > > > positive integer (unit: ms)
> > > > >
> > > > > CREATE VIEW
> > > > >
> > > > > CREATE VIEW viewName
> > > > >   [
> > > > > ( columnName [, columnName]* )
> > > > >   ]
> > > > > AS queryStatement;
> > > > >
> > > > > CREATE FUNCTION
> > > > >
> > > > >  CREATE FUNCTION functionName
> > > > >   AS 'className';
> > > > >
> > > > >  className ::=
> > > > > fully qualified name
> > > > >
> > > > >
> > > > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > > > >
> > > > > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the
> > design
> > > > doc
> > > > > > first and start implementation w/o the unified connector API
> ready
> > by
> > > > > > skipping some featue.
> > > > > >
> > > > > > Xuefu, I like the idea of making Flink specific properties into
> > > generic
> > > > > > key-value pairs, so that it will make integration with Hive DDL
> (or
> > > > > others,
> > > > > > e.g. Beam DDL) easier.
> > > > > >
> > > > > > I'll run a final pass over the design doc and finalize the design
> > in
> > > > the
> > > > > > next fe

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Bowen Li
Hi Shuyi,

re: we should take into account the new external catalog effort in the
design

Definitely. We've had a much better understanding of catalogs over the past
few weeks, and will be more than happy to contribute to that part.

Thanks, Bowen






On Wed, Nov 28, 2018 at 4:12 PM Zhang, Xuefu 
wrote:

> +1 Sounds great!
>
>
> --
> Sender:Shuyi Chen 
> Sent at:2018 Nov 29 (Thu) 06:56
> Recipient:dev 
> Subject:Re: [DISCUSS] Flink SQL DDL Design
>
> Thanks a lot, Shaoxuan, Jack and Lin. We should definitely collaborate
> here, we have also our own DDL implementation running in production for
> almost 2 years at Uber. With the joint experience from both companies, we
> can definitely make the Flink SQL DDL better.
>
> As @shaoxuan suggest, Jark can come up with a doc that talks about the
> current DDL design in Alibaba, and we can discuss and merge them into one,
> make it as a FLIP, and plan the tasks for implementation. Also, we should
> take into account the new external catalog effort in the design. What do
> you guys think?
>
> Shuyi
>
> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu  wrote:
>
> > Hi Shaoxuan,
> >
> > I think summarizing it into a google doc is a good idea. We will prepare
> it
> > in the next few days.
> >
> > Thanks,
> > Jark
> >
> > Shaoxuan Wang  于2018年11月28日周三 下午9:17写道:
> >
> > > Hi Lin and Jark,
> > > Thanks for sharing those details. Can you please consider summarizing
> > your
> > > DDL design into a google doc.
> > > We can still continue the discussions on Shuyi's proposal. But having a
> > > separate google doc will be easy for the DEV to
> > understand/comment/discuss
> > > on your proposed DDL implementation.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Wed, Nov 28, 2018 at 7:39 PM Jark Wu  wrote:
> > >
> > > > Hi Shuyi,
> > > >
> > > > Thanks for bringing up this discussion and the awesome work! I have
> > left
> > > > some comments in the doc.
> > > >
> > > > I want to share something more about the watermark definition learned
> > > from
> > > > Alibaba.
> > > >
> > > >1.
> > > >
> > > >Table should be able to accept multiple watermark definition.
> > > >
> > > >Because a table may have more than one rowtime field. For example,
> > one
> > > >rowtime field is from existing field but missing in some records,
> > > > another
> > > >is the ingestion timestamp in Kafka but not very accurate. In this
> > > case,
> > > >user may define two rowtime fields with watermarks in the Table
> and
> > > > choose
> > > >one in different situation.
> > > >2.
> > > >
> > > >Watermark stragety always work with rowtime field together.
> > > >
> > > > Based on the two points metioned above, I think we should combine the
> > > > watermark strategy and rowtime field selection (i.e. which existing
> > field
> > > > used to generate watermark) in one clause, so that we can define
> > multiple
> > > > watermarks in one Table.
> > > >
> > > > Here I will share the watermark syntax used in Alibaba (simply
> > modified):
> > > >
> > > > watermarkDefinition:
> > > > WATERMARK [watermarkName] FOR  AS wm_strategy
> > > >
> > > > wm_strategy:
> > > >   BOUNDED WITH OFFSET 'string' timeUnit
> > > > |
> > > >   ASCENDING
> > > >
> > > > The “WATERMARK” keyword starts a watermark definition. The “FOR”
> > keyword
> > > > defines which existing field used to generate watermark, this field
> > > should
> > > > already exist in the schema (we can use computed-column to derive
> from
> > > > other fields). The “AS” keyword defines watermark strategy, such as
> > > BOUNDED
> > > > WITH OFFSET (covers almost all the requirements) and ASCENDING.
> > > >
> > > > When the expected rowtime field does not exist in the schema, we can
> > use
> > > > computed-column syntax to derive it from other existing fields using
> > > > built-in functions or user defined functions. So the
> rowtime/watermark
> > > > definition doesn’t need to care about “field-change” strategy
>

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Zhang, Xuefu
+1 Sounds great!


--
Sender:Shuyi Chen 
Sent at:2018 Nov 29 (Thu) 06:56
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks a lot, Shaoxuan, Jack and Lin. We should definitely collaborate
here, we have also our own DDL implementation running in production for
almost 2 years at Uber. With the joint experience from both companies, we
can definitely make the Flink SQL DDL better.

As @shaoxuan suggest, Jark can come up with a doc that talks about the
current DDL design in Alibaba, and we can discuss and merge them into one,
make it as a FLIP, and plan the tasks for implementation. Also, we should
take into account the new external catalog effort in the design. What do
you guys think?

Shuyi

On Wed, Nov 28, 2018 at 6:45 AM Jark Wu  wrote:

> Hi Shaoxuan,
>
> I think summarizing it into a google doc is a good idea. We will prepare it
> in the next few days.
>
> Thanks,
> Jark
>
> Shaoxuan Wang  于2018年11月28日周三 下午9:17写道:
>
> > Hi Lin and Jark,
> > Thanks for sharing those details. Can you please consider summarizing
> your
> > DDL design into a google doc.
> > We can still continue the discussions on Shuyi's proposal. But having a
> > separate google doc will be easy for the DEV to
> understand/comment/discuss
> > on your proposed DDL implementation.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Nov 28, 2018 at 7:39 PM Jark Wu  wrote:
> >
> > > Hi Shuyi,
> > >
> > > Thanks for bringing up this discussion and the awesome work! I have
> left
> > > some comments in the doc.
> > >
> > > I want to share something more about the watermark definition learned
> > from
> > > Alibaba.
> > >
> > >1.
> > >
> > >Table should be able to accept multiple watermark definition.
> > >
> > >Because a table may have more than one rowtime field. For example,
> one
> > >rowtime field is from existing field but missing in some records,
> > > another
> > >is the ingestion timestamp in Kafka but not very accurate. In this
> > case,
> > >user may define two rowtime fields with watermarks in the Table and
> > > choose
> > >one in different situation.
> > >2.
> > >
> > >Watermark stragety always work with rowtime field together.
> > >
> > > Based on the two points metioned above, I think we should combine the
> > > watermark strategy and rowtime field selection (i.e. which existing
> field
> > > used to generate watermark) in one clause, so that we can define
> multiple
> > > watermarks in one Table.
> > >
> > > Here I will share the watermark syntax used in Alibaba (simply
> modified):
> > >
> > > watermarkDefinition:
> > > WATERMARK [watermarkName] FOR  AS wm_strategy
> > >
> > > wm_strategy:
> > >   BOUNDED WITH OFFSET 'string' timeUnit
> > > |
> > >   ASCENDING
> > >
> > > The “WATERMARK” keyword starts a watermark definition. The “FOR”
> keyword
> > > defines which existing field used to generate watermark, this field
> > should
> > > already exist in the schema (we can use computed-column to derive from
> > > other fields). The “AS” keyword defines watermark strategy, such as
> > BOUNDED
> > > WITH OFFSET (covers almost all the requirements) and ASCENDING.
> > >
> > > When the expected rowtime field does not exist in the schema, we can
> use
> > > computed-column syntax to derive it from other existing fields using
> > > built-in functions or user defined functions. So the rowtime/watermark
> > > definition doesn’t need to care about “field-change” strategy
> > > (replace/add/from-field). And the proctime field definition can also be
> > > defined using computed-column. Such as pt as PROCTIME() which defines a
> > > proctime field named “pt” in the schema.
> > >
> > > Looking forward to working with you guys!
> > >
> > > Best,
> > > Jark Wu
> > >
> > >
> > > Lin Li  于2018年11月28日周三 下午6:33写道:
> > >
> > > > @Shuyi
> > > > Thanks for the proposal!  We have a simple DDL implementation
> (extends
> > > > Calcite's parser) which been running for almost two years on
> production
> > > and
> > > > works well.
> > > > I think the most valued things we'd learned is keeping simplicity and
> > > > standard complia

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Shuyi Chen
 columnDefinition]*
> > > > [ computedColumnDefinition [, computedColumnDefinition]* ]
> > > > [ tableConstraint [, tableConstraint]* ]
> > > > [ tableIndex [, tableIndex]* ]
> > > > [ PERIOD FOR SYSTEM_TIME ]
> > > > [ WATERMARK watermarkName FOR rowTimeColumn AS
> > > > withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption [ ,
> > > > tableOption]* ) ] [ ; ]
> > > >
> > > > columnDefinition ::=
> > > > columnName dataType [ NOT NULL ]
> > > >
> > > > dataType  ::=
> > > > {
> > > >   [ VARCHAR ]
> > > >   | [ BOOLEAN ]
> > > >   | [ TINYINT ]
> > > >   | [ SMALLINT ]
> > > >   | [ INT ]
> > > >   | [ BIGINT ]
> > > >   | [ FLOAT ]
> > > >   | [ DECIMAL ]
> > > >   | [ DOUBLE ]
> > > >   | [ DATE ]
> > > >   | [ TIME ]
> > > >   | [ TIMESTAMP ]
> > > >   | [ VARBINARY ]
> > > > }
> > > >
> > > > computedColumnDefinition ::=
> > > > columnName AS computedColumnExpression
> > > >
> > > > tableConstraint ::=
> > > > { PRIMARY KEY | UNIQUE }
> > > > (columnName [, columnName]* )
> > > >
> > > > tableIndex ::=
> > > > [ UNIQUE ] INDEX indexName
> > > >  (columnName [, columnName]* )
> > > >
> > > > rowTimeColumn ::=
> > > > columnName
> > > >
> > > > tableOption ::=
> > > > property=value
> > > > offset ::=
> > > > positive integer (unit: ms)
> > > >
> > > > CREATE VIEW
> > > >
> > > > CREATE VIEW viewName
> > > >   [
> > > > ( columnName [, columnName]* )
> > > >   ]
> > > > AS queryStatement;
> > > >
> > > > CREATE FUNCTION
> > > >
> > > >  CREATE FUNCTION functionName
> > > >   AS 'className';
> > > >
> > > >  className ::=
> > > > fully qualified name
> > > >
> > > >
> > > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > > >
> > > > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the
> design
> > > doc
> > > > > first and start implementation w/o the unified connector API ready
> by
> > > > > skipping some featue.
> > > > >
> > > > > Xuefu, I like the idea of making Flink specific properties into
> > generic
> > > > > key-value pairs, so that it will make integration with Hive DDL (or
> > > > others,
> > > > > e.g. Beam DDL) easier.
> > > > >
> > > > > I'll run a final pass over the design doc and finalize the design
> in
> > > the
> > > > > next few days. And we can start creating tasks and collaborate on
> the
> > > > > implementation. Thanks a lot for all the comments and inputs.
> > > > >
> > > > > Cheers!
> > > > > Shuyi
> > > > >
> > > > > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
> > xuef...@alibaba-inc.com>
> > > > > wrote:
> > > > >
> > > > > > Yeah! I agree with Timo that DDL can actually proceed w/o being
> > > blocked
> > > > > by
> > > > > > connector API. We can leave the unknown out while defining the
> > basic
> > > > > syntax.
> > > > > >
> > > > > > @Shuyi
> > > > > >
> > > > > > As commented in the doc, I think we can probably stick with
> simple
> > > > syntax
> > > > > > with general properties, without extending the syntax too much
> that
> > > it
> > > > > > mimics the descriptor API.
> > > > > >
> > > > > > Part of our effort on Flink-Hive integration is also to make DDL
> > > syntax
> > > > > > compatible with Hive's. The one in the current proposal seems
> > making
> > > > our
> > > > > > effort more challenging.
> > > > > >
> > > > > > We can help and 

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Jark Wu
; > > computedColumnDefinition ::=
> > > columnName AS computedColumnExpression
> > >
> > > tableConstraint ::=
> > > { PRIMARY KEY | UNIQUE }
> > > (columnName [, columnName]* )
> > >
> > > tableIndex ::=
> > > [ UNIQUE ] INDEX indexName
> > >  (columnName [, columnName]* )
> > >
> > > rowTimeColumn ::=
> > > columnName
> > >
> > > tableOption ::=
> > > property=value
> > > offset ::=
> > > positive integer (unit: ms)
> > >
> > > CREATE VIEW
> > >
> > > CREATE VIEW viewName
> > >   [
> > > ( columnName [, columnName]* )
> > >   ]
> > > AS queryStatement;
> > >
> > > CREATE FUNCTION
> > >
> > >  CREATE FUNCTION functionName
> > >   AS 'className';
> > >
> > >  className ::=
> > > fully qualified name
> > >
> > >
> > > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> > >
> > > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design
> > doc
> > > > first and start implementation w/o the unified connector API ready by
> > > > skipping some featue.
> > > >
> > > > Xuefu, I like the idea of making Flink specific properties into
> generic
> > > > key-value pairs, so that it will make integration with Hive DDL (or
> > > others,
> > > > e.g. Beam DDL) easier.
> > > >
> > > > I'll run a final pass over the design doc and finalize the design in
> > the
> > > > next few days. And we can start creating tasks and collaborate on the
> > > > implementation. Thanks a lot for all the comments and inputs.
> > > >
> > > > Cheers!
> > > > Shuyi
> > > >
> > > > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
> xuef...@alibaba-inc.com>
> > > > wrote:
> > > >
> > > > > Yeah! I agree with Timo that DDL can actually proceed w/o being
> > blocked
> > > > by
> > > > > connector API. We can leave the unknown out while defining the
> basic
> > > > syntax.
> > > > >
> > > > > @Shuyi
> > > > >
> > > > > As commented in the doc, I think we can probably stick with simple
> > > syntax
> > > > > with general properties, without extending the syntax too much that
> > it
> > > > > mimics the descriptor API.
> > > > >
> > > > > Part of our effort on Flink-Hive integration is also to make DDL
> > syntax
> > > > > compatible with Hive's. The one in the current proposal seems
> making
> > > our
> > > > > effort more challenging.
> > > > >
> > > > > We can help and collaborate. At this moment, I think we can
> finalize
> > on
> > > > > the proposal and then we can divide the tasks for better
> > collaboration.
> > > > >
> > > > > Please let me know if there are  any questions or suggestions.
> > > > >
> > > > > Thanks,
> > > > > Xuefu
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sender:Timo Walther 
> > > > > Sent at:2018 Nov 27 (Tue) 16:21
> > > > > Recipient:dev 
> > > > > Subject:Re: [DISCUSS] Flink SQL DDL Design
> > > > >
> > > > > Thanks for offering your help here, Xuefu. It would be great to
> move
> > > > > these efforts forward. I agree that the DDL is somehow releated to
> > the
> > > > > unified connector API design but we can also start with the basic
> > > > > functionality now and evolve the DDL during this release and next
> > > > releases.
> > > > >
> > > > > For example, we could identify the MVP DDL syntax that skips
> defining
> > > > > key constraints and maybe even time attributes. This DDL could be
> > used
> > > > > for batch usecases, ETL, and materializing SQL queries (no time
> > > > > operations like windows).
> > > > >
> > > > > The unified connector API is high on our priority list for the 1.8
> > > > > re

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Shaoxuan Wang
; > CREATE FUNCTION
> >
> >  CREATE FUNCTION functionName
> >   AS 'className';
> >
> >  className ::=
> > fully qualified name
> >
> >
> > Shuyi Chen  于2018年11月28日周三 上午3:28写道:
> >
> > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design
> doc
> > > first and start implementation w/o the unified connector API ready by
> > > skipping some featue.
> > >
> > > Xuefu, I like the idea of making Flink specific properties into generic
> > > key-value pairs, so that it will make integration with Hive DDL (or
> > others,
> > > e.g. Beam DDL) easier.
> > >
> > > I'll run a final pass over the design doc and finalize the design in
> the
> > > next few days. And we can start creating tasks and collaborate on the
> > > implementation. Thanks a lot for all the comments and inputs.
> > >
> > > Cheers!
> > > Shuyi
> > >
> > > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu 
> > > wrote:
> > >
> > > > Yeah! I agree with Timo that DDL can actually proceed w/o being
> blocked
> > > by
> > > > connector API. We can leave the unknown out while defining the basic
> > > syntax.
> > > >
> > > > @Shuyi
> > > >
> > > > As commented in the doc, I think we can probably stick with simple
> > syntax
> > > > with general properties, without extending the syntax too much that
> it
> > > > mimics the descriptor API.
> > > >
> > > > Part of our effort on Flink-Hive integration is also to make DDL
> syntax
> > > > compatible with Hive's. The one in the current proposal seems making
> > our
> > > > effort more challenging.
> > > >
> > > > We can help and collaborate. At this moment, I think we can finalize
> on
> > > > the proposal and then we can divide the tasks for better
> collaboration.
> > > >
> > > > Please let me know if there are  any questions or suggestions.
> > > >
> > > > Thanks,
> > > > Xuefu
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Sender:Timo Walther 
> > > > Sent at:2018 Nov 27 (Tue) 16:21
> > > > Recipient:dev 
> > > > Subject:Re: [DISCUSS] Flink SQL DDL Design
> > > >
> > > > Thanks for offering your help here, Xuefu. It would be great to move
> > > > these efforts forward. I agree that the DDL is somehow releated to
> the
> > > > unified connector API design but we can also start with the basic
> > > > functionality now and evolve the DDL during this release and next
> > > releases.
> > > >
> > > > For example, we could identify the MVP DDL syntax that skips defining
> > > > key constraints and maybe even time attributes. This DDL could be
> used
> > > > for batch usecases, ETL, and materializing SQL queries (no time
> > > > operations like windows).
> > > >
> > > > The unified connector API is high on our priority list for the 1.8
> > > > release. I will try to update the document until mid of next week.
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Timo
> > > >
> > > >
> > > > Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > > > > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> > > > weeks,
> > > > > but we are definitely interested in moving this forward. I think
> once
> > > the
> > > > > unified connector API design [1] is done, we can finalize the DDL
> > > design
> > > > as
> > > > > well and start creating concrete subtasks to collaborate on the
> > > > > implementation with the community.
> > > > >
> > > > > Shuyi
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > > >
> > > > > On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
> > xuef...@alibaba-inc.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Shuyi,
> > > > >>
> > > > >> I'm wondering if you fo

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Jark Wu
> >
> > > Yeah! I agree with Timo that DDL can actually proceed w/o being blocked
> > by
> > > connector API. We can leave the unknown out while defining the basic
> > syntax.
> > >
> > > @Shuyi
> > >
> > > As commented in the doc, I think we can probably stick with simple
> syntax
> > > with general properties, without extending the syntax too much that it
> > > mimics the descriptor API.
> > >
> > > Part of our effort on Flink-Hive integration is also to make DDL syntax
> > > compatible with Hive's. The one in the current proposal seems making
> our
> > > effort more challenging.
> > >
> > > We can help and collaborate. At this moment, I think we can finalize on
> > > the proposal and then we can divide the tasks for better collaboration.
> > >
> > > Please let me know if there are  any questions or suggestions.
> > >
> > > Thanks,
> > > Xuefu
> > >
> > >
> > >
> > >
> > > --
> > > Sender:Timo Walther 
> > > Sent at:2018 Nov 27 (Tue) 16:21
> > > Recipient:dev 
> > > Subject:Re: [DISCUSS] Flink SQL DDL Design
> > >
> > > Thanks for offering your help here, Xuefu. It would be great to move
> > > these efforts forward. I agree that the DDL is somehow releated to the
> > > unified connector API design but we can also start with the basic
> > > functionality now and evolve the DDL during this release and next
> > releases.
> > >
> > > For example, we could identify the MVP DDL syntax that skips defining
> > > key constraints and maybe even time attributes. This DDL could be used
> > > for batch usecases, ETL, and materializing SQL queries (no time
> > > operations like windows).
> > >
> > > The unified connector API is high on our priority list for the 1.8
> > > release. I will try to update the document until mid of next week.
> > >
> > >
> > > Regards,
> > >
> > > Timo
> > >
> > >
> > > Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > > > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> > > weeks,
> > > > but we are definitely interested in moving this forward. I think once
> > the
> > > > unified connector API design [1] is done, we can finalize the DDL
> > design
> > > as
> > > > well and start creating concrete subtasks to collaborate on the
> > > > implementation with the community.
> > > >
> > > > Shuyi
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > >
> > > > On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
> xuef...@alibaba-inc.com>
> > > > wrote:
> > > >
> > > >> Hi Shuyi,
> > > >>
> > > >> I'm wondering if you folks still have the bandwidth working on this.
> > > >>
> > > >> We have some dedicated resource and like to move this forward. We
> can
> > > >> collaborate.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Xuefu
> > > >>
> > > >>
> > > >> --
> > > >> 发件人:wenlong.lwl
> > > >> 日 期:2018年11月05日 11:15:35
> > > >> 收件人:
> > > >> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> > > >>
> > > >> Hi, Shuyi, thanks for the proposal.
> > > >>
> > > >> I have two concerns about the table ddl:
> > > >>
> > > >> 1. how about remove the source/sink mark from the ddl, because it is
> > not
> > > >> necessary, the framework determine the table referred is a source
> or a
> > > sink
> > > >> according to the context of the query using the table. it will be
> more
> > > >> convenient for use defining a table which can be both a source and
> > sink,
> > > >> and more convenient for catalog to persistent and manage the meta
> > infos.
> > > >>
> > > >> 2. how about just keeping one pure string map as parameters for
> table,
> > > like
> > > >> create tabe Kafka10SourceTable (
> > > >

Re: [DISCUSS] Flink SQL DDL Design

2018-11-28 Thread Lin Li
@Shuyi
Thanks for the proposal!  We have a simple DDL implementation (extends
Calcite's parser) which been running for almost two years on production and
works well.
I think the most valued things we'd learned is keeping simplicity and
standard compliance.
Here's the approximate grammar, FYI
CREATE TABLE

CREATE TABLE tableName(
columnDefinition [, columnDefinition]*
[ computedColumnDefinition [, computedColumnDefinition]* ]
[ tableConstraint [, tableConstraint]* ]
[ tableIndex [, tableIndex]* ]
[ PERIOD FOR SYSTEM_TIME ]
[ WATERMARK watermarkName FOR rowTimeColumn AS
withOffset(rowTimeColumn, offset) ] ) [ WITH ( tableOption [ ,
tableOption]* ) ] [ ; ]

columnDefinition ::=
columnName dataType [ NOT NULL ]

dataType  ::=
{
  [ VARCHAR ]
  | [ BOOLEAN ]
  | [ TINYINT ]
  | [ SMALLINT ]
  | [ INT ]
  | [ BIGINT ]
  | [ FLOAT ]
  | [ DECIMAL ]
  | [ DOUBLE ]
  | [ DATE ]
  | [ TIME ]
  | [ TIMESTAMP ]
  | [ VARBINARY ]
}

computedColumnDefinition ::=
columnName AS computedColumnExpression

tableConstraint ::=
{ PRIMARY KEY | UNIQUE }
(columnName [, columnName]* )

tableIndex ::=
[ UNIQUE ] INDEX indexName
 (columnName [, columnName]* )

rowTimeColumn ::=
columnName

tableOption ::=
property=value
offset ::=
positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
  [
( columnName [, columnName]* )
  ]
AS queryStatement;

CREATE FUNCTION

 CREATE FUNCTION functionName
  AS 'className';

 className ::=
fully qualified name


Shuyi Chen  于2018年11月28日周三 上午3:28写道:

> Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design doc
> first and start implementation w/o the unified connector API ready by
> skipping some featue.
>
> Xuefu, I like the idea of making Flink specific properties into generic
> key-value pairs, so that it will make integration with Hive DDL (or others,
> e.g. Beam DDL) easier.
>
> I'll run a final pass over the design doc and finalize the design in the
> next few days. And we can start creating tasks and collaborate on the
> implementation. Thanks a lot for all the comments and inputs.
>
> Cheers!
> Shuyi
>
> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu 
> wrote:
>
> > Yeah! I agree with Timo that DDL can actually proceed w/o being blocked
> by
> > connector API. We can leave the unknown out while defining the basic
> syntax.
> >
> > @Shuyi
> >
> > As commented in the doc, I think we can probably stick with simple syntax
> > with general properties, without extending the syntax too much that it
> > mimics the descriptor API.
> >
> > Part of our effort on Flink-Hive integration is also to make DDL syntax
> > compatible with Hive's. The one in the current proposal seems making our
> > effort more challenging.
> >
> > We can help and collaborate. At this moment, I think we can finalize on
> > the proposal and then we can divide the tasks for better collaboration.
> >
> > Please let me know if there are  any questions or suggestions.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> >
> > --
> > Sender:Timo Walther 
> > Sent at:2018 Nov 27 (Tue) 16:21
> > Recipient:dev 
> > Subject:Re: [DISCUSS] Flink SQL DDL Design
> >
> > Thanks for offering your help here, Xuefu. It would be great to move
> > these efforts forward. I agree that the DDL is somehow releated to the
> > unified connector API design but we can also start with the basic
> > functionality now and evolve the DDL during this release and next
> releases.
> >
> > For example, we could identify the MVP DDL syntax that skips defining
> > key constraints and maybe even time attributes. This DDL could be used
> > for batch usecases, ETL, and materializing SQL queries (no time
> > operations like windows).
> >
> > The unified connector API is high on our priority list for the 1.8
> > release. I will try to update the document until mid of next week.
> >
> >
> > Regards,
> >
> > Timo
> >
> >
> > Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> > weeks,
> > > but we are definitely interested in moving this forward. I think once
> the
> > > unified connector API design [1] is done, we can finalize the DDL
> design
> > as
> > > well and start creating concrete subtasks to collaborate

Re: [DISCUSS] Flink SQL DDL Design

2018-11-27 Thread Shuyi Chen
Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design doc
first and start implementation w/o the unified connector API ready by
skipping some featue.

Xuefu, I like the idea of making Flink specific properties into generic
key-value pairs, so that it will make integration with Hive DDL (or others,
e.g. Beam DDL) easier.

I'll run a final pass over the design doc and finalize the design in the
next few days. And we can start creating tasks and collaborate on the
implementation. Thanks a lot for all the comments and inputs.

Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu 
wrote:

> Yeah! I agree with Timo that DDL can actually proceed w/o being blocked by
> connector API. We can leave the unknown out while defining the basic syntax.
>
> @Shuyi
>
> As commented in the doc, I think we can probably stick with simple syntax
> with general properties, without extending the syntax too much that it
> mimics the descriptor API.
>
> Part of our effort on Flink-Hive integration is also to make DDL syntax
> compatible with Hive's. The one in the current proposal seems making our
> effort more challenging.
>
> We can help and collaborate. At this moment, I think we can finalize on
> the proposal and then we can divide the tasks for better collaboration.
>
> Please let me know if there are  any questions or suggestions.
>
> Thanks,
> Xuefu
>
>
>
>
> --
> Sender:Timo Walther 
> Sent at:2018 Nov 27 (Tue) 16:21
> Recipient:dev 
> Subject:Re: [DISCUSS] Flink SQL DDL Design
>
> Thanks for offering your help here, Xuefu. It would be great to move
> these efforts forward. I agree that the DDL is somehow releated to the
> unified connector API design but we can also start with the basic
> functionality now and evolve the DDL during this release and next releases.
>
> For example, we could identify the MVP DDL syntax that skips defining
> key constraints and maybe even time attributes. This DDL could be used
> for batch usecases, ETL, and materializing SQL queries (no time
> operations like windows).
>
> The unified connector API is high on our priority list for the 1.8
> release. I will try to update the document until mid of next week.
>
>
> Regards,
>
> Timo
>
>
> Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> weeks,
> > but we are definitely interested in moving this forward. I think once the
> > unified connector API design [1] is done, we can finalize the DDL design
> as
> > well and start creating concrete subtasks to collaborate on the
> > implementation with the community.
> >
> > Shuyi
> >
> > [1]
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> >
> > On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu 
> > wrote:
> >
> >> Hi Shuyi,
> >>
> >> I'm wondering if you folks still have the bandwidth working on this.
> >>
> >> We have some dedicated resource and like to move this forward. We can
> >> collaborate.
> >>
> >> Thanks,
> >>
> >> Xuefu
> >>
> >>
> >> --
> >> 发件人:wenlong.lwl
> >> 日 期:2018年11月05日 11:15:35
> >> 收件人:
> >> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> >>
> >> Hi, Shuyi, thanks for the proposal.
> >>
> >> I have two concerns about the table ddl:
> >>
> >> 1. how about remove the source/sink mark from the ddl, because it is not
> >> necessary, the framework determine the table referred is a source or a
> sink
> >> according to the context of the query using the table. it will be more
> >> convenient for use defining a table which can be both a source and sink,
> >> and more convenient for catalog to persistent and manage the meta infos.
> >>
> >> 2. how about just keeping one pure string map as parameters for table,
> like
> >> create tabe Kafka10SourceTable (
> >> intField INTEGER,
> >> stringField VARCHAR(128),
> >> longField BIGINT,
> >> rowTimeField TIMESTAMP
> >> ) with (
> >> connector.type = ’kafka’,
> >> connector.property-version = ’1’,
> >> connector.version = ’0.10’,
> >> connector.properties.topic = ‘test-kafka-topic’,
> >> connector.properties.startup-mode = ‘latest-offset’,
> >> connector.properties.specific-offset = ‘offset’,
> >> format.type = 'json'
> >> format.prperties.vers

Re: [DISCUSS] Flink SQL DDL Design

2018-11-27 Thread Zhang, Xuefu
Yeah! I agree with Timo that DDL can actually proceed w/o being blocked by 
connector API. We can leave the unknown out while defining the basic syntax.

@Shuyi 

As commented in the doc, I think we can probably stick with simple syntax with 
general properties, without extending the syntax too much that it mimics the 
descriptor API. 

Part of our effort on Flink-Hive integration is also to make DDL syntax 
compatible with Hive's. The one in the current proposal seems making our effort 
more challenging.

We can help and collaborate. At this moment, I think we can finalize on the 
proposal and then we can divide the tasks for better collaboration.

Please let me know if there are  any questions or suggestions.

Thanks,
Xuefu




--
Sender:Timo Walther 
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev 
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It would be great to move 
these efforts forward. I agree that the DDL is somehow releated to the 
unified connector API design but we can also start with the basic 
functionality now and evolve the DDL during this release and next releases.

For example, we could identify the MVP DDL syntax that skips defining 
key constraints and maybe even time attributes. This DDL could be used 
for batch usecases, ETL, and materializing SQL queries (no time 
operations like windows).

The unified connector API is high on our priority list for the 1.8 
release. I will try to update the document until mid of next week.


Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> Thanks a lot, Xuefu. I was busy for some other stuff for the last 2 weeks,
> but we are definitely interested in moving this forward. I think once the
> unified connector API design [1] is done, we can finalize the DDL design as
> well and start creating concrete subtasks to collaborate on the
> implementation with the community.
>
> Shuyi
>
> [1]
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
>
> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu 
> wrote:
>
>> Hi Shuyi,
>>
>> I'm wondering if you folks still have the bandwidth working on this.
>>
>> We have some dedicated resource and like to move this forward. We can
>> collaborate.
>>
>> Thanks,
>>
>> Xuefu
>>
>>
>> ------------------
>> 发件人:wenlong.lwl
>> 日 期:2018年11月05日 11:15:35
>> 收件人:
>> 主 题:Re: [DISCUSS] Flink SQL DDL Design
>>
>> Hi, Shuyi, thanks for the proposal.
>>
>> I have two concerns about the table ddl:
>>
>> 1. how about remove the source/sink mark from the ddl, because it is not
>> necessary, the framework determine the table referred is a source or a sink
>> according to the context of the query using the table. it will be more
>> convenient for use defining a table which can be both a source and sink,
>> and more convenient for catalog to persistent and manage the meta infos.
>>
>> 2. how about just keeping one pure string map as parameters for table, like
>> create tabe Kafka10SourceTable (
>> intField INTEGER,
>> stringField VARCHAR(128),
>> longField BIGINT,
>> rowTimeField TIMESTAMP
>> ) with (
>> connector.type = ’kafka’,
>> connector.property-version = ’1’,
>> connector.version = ’0.10’,
>> connector.properties.topic = ‘test-kafka-topic’,
>> connector.properties.startup-mode = ‘latest-offset’,
>> connector.properties.specific-offset = ‘offset’,
>> format.type = 'json'
>> format.prperties.version=’1’,
>> format.derive-schema = 'true'
>> );
>> Because:
>> 1. in TableFactory, what user use is a string map properties, defining
>> parameters by string-map can be the closest way to mapping how user use the
>> parameters.
>> 2. The table descriptor can be extended by user, like what is done in Kafka
>> and Json, it means that the parameter keys in connector or format can be
>> different in different implementation, we can not restrict the key in a
>> specified set, so we need a map in connector scope and a map in
>> connector.properties scope. why not just give user a single map, let them
>> put parameters in a format they like, which is also the simplest way to
>> implement DDL parser.
>> 3. whether we can define a format clause or not, depends on the
>> implementation of the connector, using different clause in DDL may make a
>> misunderstanding that we can combine the connectors with arbitrary formats,
>> which may not work actually.
>>
>> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:
>&

Re: [DISCUSS] Flink SQL DDL Design

2018-11-27 Thread Timo Walther
Thanks for offering your help here, Xuefu. It would be great to move 
these efforts forward. I agree that the DDL is somehow releated to the 
unified connector API design but we can also start with the basic 
functionality now and evolve the DDL during this release and next releases.


For example, we could identify the MVP DDL syntax that skips defining 
key constraints and maybe even time attributes. This DDL could be used 
for batch usecases, ETL, and materializing SQL queries (no time 
operations like windows).


The unified connector API is high on our priority list for the 1.8 
release. I will try to update the document until mid of next week.



Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:

Thanks a lot, Xuefu. I was busy for some other stuff for the last 2 weeks,
but we are definitely interested in moving this forward. I think once the
unified connector API design [1] is done, we can finalize the DDL design as
well and start creating concrete subtasks to collaborate on the
implementation with the community.

Shuyi

[1]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing

On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu 
wrote:


Hi Shuyi,

I'm wondering if you folks still have the bandwidth working on this.

We have some dedicated resource and like to move this forward. We can
collaborate.

Thanks,

Xuefu


--
发件人:wenlong.lwl
日 期:2018年11月05日 11:15:35
收件人:
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the ddl, because it is not
necessary, the framework determine the table referred is a source or a sink
according to the context of the query using the table. it will be more
convenient for use defining a table which can be both a source and sink,
and more convenient for catalog to persistent and manage the meta infos.

2. how about just keeping one pure string map as parameters for table, like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode = ‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map properties, defining
parameters by string-map can be the closest way to mapping how user use the
parameters.
2. The table descriptor can be extended by user, like what is done in Kafka
and Json, it means that the parameter keys in connector or format can be
different in different implementation, we can not restrict the key in a
specified set, so we need a map in connector scope and a map in
connector.properties scope. why not just give user a single map, let them
put parameters in a format they like, which is also the simplest way to
implement DDL parser.
3. whether we can define a format clause or not, depends on the
implementation of the connector, using different clause in DDL may make a
misunderstanding that we can combine the connectors with arbitrary formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:


+1, Thanks for the proposal.

I guess this is a long-awaited change. This can vastly increase the
functionalities of the SQL Client as it will be possible to use complex
extensions like for example those provided by Apache Bahir[1].

Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):


+1. Thanks for putting the proposal together Shuyi.

DDL has been brought up in a couple of times previously [1,2].

Utilizing

DDL will definitely be a great extension to the current Flink SQL to
systematically support some of the previously brought up features such

as

[3]. And it will also be beneficial to see the document closely aligned
with the previous discussion for unified SQL connector API [4].

I also left a few comments on the doc. Looking forward to the alignment
with the other couple of efforts and contributing to them!

Best,
Rong

[1]



http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E

[2]



http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E

[3] https://issues.apache.org/jira/browse/FLINK-8003
[4]



http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E


On Fri, Nov 2, 2018 at 10:22 AM Bowen Li  wrote:


Thanks Shuyi!

I left some comments there. I think the design of SQL DDL and

Flink-Hive

integration/External catalog enhancemen

Re: [DISCUSS] Flink SQL DDL Design

2018-11-26 Thread Shuyi Chen
Thanks a lot, Xuefu. I was busy for some other stuff for the last 2 weeks,
but we are definitely interested in moving this forward. I think once the
unified connector API design [1] is done, we can finalize the DDL design as
well and start creating concrete subtasks to collaborate on the
implementation with the community.

Shuyi

[1]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing

On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu 
wrote:

> Hi Shuyi,
>
> I'm wondering if you folks still have the bandwidth working on this.
>
> We have some dedicated resource and like to move this forward. We can
> collaborate.
>
> Thanks,
>
> Xuefu
>
>
> --
> 发件人:wenlong.lwl
> 日 期:2018年11月05日 11:15:35
> 收件人:
> 主 题:Re: [DISCUSS] Flink SQL DDL Design
>
> Hi, Shuyi, thanks for the proposal.
>
> I have two concerns about the table ddl:
>
> 1. how about remove the source/sink mark from the ddl, because it is not
> necessary, the framework determine the table referred is a source or a sink
> according to the context of the query using the table. it will be more
> convenient for use defining a table which can be both a source and sink,
> and more convenient for catalog to persistent and manage the meta infos.
>
> 2. how about just keeping one pure string map as parameters for table, like
> create tabe Kafka10SourceTable (
> intField INTEGER,
> stringField VARCHAR(128),
> longField BIGINT,
> rowTimeField TIMESTAMP
> ) with (
> connector.type = ’kafka’,
> connector.property-version = ’1’,
> connector.version = ’0.10’,
> connector.properties.topic = ‘test-kafka-topic’,
> connector.properties.startup-mode = ‘latest-offset’,
> connector.properties.specific-offset = ‘offset’,
> format.type = 'json'
> format.prperties.version=’1’,
> format.derive-schema = 'true'
> );
> Because:
> 1. in TableFactory, what user use is a string map properties, defining
> parameters by string-map can be the closest way to mapping how user use the
> parameters.
> 2. The table descriptor can be extended by user, like what is done in Kafka
> and Json, it means that the parameter keys in connector or format can be
> different in different implementation, we can not restrict the key in a
> specified set, so we need a map in connector scope and a map in
> connector.properties scope. why not just give user a single map, let them
> put parameters in a format they like, which is also the simplest way to
> implement DDL parser.
> 3. whether we can define a format clause or not, depends on the
> implementation of the connector, using different clause in DDL may make a
> misunderstanding that we can combine the connectors with arbitrary formats,
> which may not work actually.
>
> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:
>
> > +1, Thanks for the proposal.
> >
> > I guess this is a long-awaited change. This can vastly increase the
> > functionalities of the SQL Client as it will be possible to use complex
> > extensions like for example those provided by Apache Bahir[1].
> >
> > Best Regards,
> > Dom.
> >
> > [1]
> > https://github.com/apache/bahir-flink
> >
> > sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):
> >
> > > +1. Thanks for putting the proposal together Shuyi.
> > >
> > > DDL has been brought up in a couple of times previously [1,2].
> Utilizing
> > > DDL will definitely be a great extension to the current Flink SQL to
> > > systematically support some of the previously brought up features such
> as
> > > [3]. And it will also be beneficial to see the document closely aligned
> > > with the previous discussion for unified SQL connector API [4].
> > >
> > > I also left a few comments on the doc. Looking forward to the alignment
> > > with the other couple of efforts and contributing to them!
> > >
> > > Best,
> > > Rong
> > >
> > > [1]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > > [2]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> > >
> > > [3] https://issues.apache.org/jira/browse/FLINK-8003
> > > [4]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
> > >
> > >
> > > On Fri, N

Re: [DISCUSS] Flink SQL DDL Design

2018-11-26 Thread Shuyi Chen
Hi Wenlong, thanks a lot for the comments.

1) I agree we can infer the table type from the queries if the Flink job is
static. However, for SQL client cases, the query is adhoc, dynamic, and not
known beforehand. In such case, we might want to enforce the table open
mode at startup time, so users won't accidentally write to a Kafka topic
that is supposed to be written only by producers outside of the Flink world.
2) as in [1], currently,  format and connector are first class concept in
Flink table, and it's required by most table creations, so I think adding
specific keyword to it makes it more organized and readable. But I do agree
a flattened key-value pair makes it simpler for parser, and easier to
extend. So maybe something like the following make more sense:

CREATE SOURCE TABLE Kafka10SourceTable (

intField INTEGER,

stringField VARCHAR(128) COMMENT ‘User IP address’,

longField BIGINT,

rowTimeField TIMESTAMP

TIMESTAMPS FROM ‘longField’

WATERMARKS PERIODIC-BOUNDED WITH DELAY '60’

)

COMMENT ‘Kafka Source Table of topic user_ip_address’

CONNECTOR (

type = ’kafka’,

property-version = ’1’,
version = ’0.10’,
properties.topic = ‘test-kafka-topic’,
properties.startup-mode = ‘latest-offset’,
properties.specific-offset = ‘offset’

)

FORMAT (

format.type = 'json',

format.prperties.version=’1’,

format.derive-schema = 'true'

)

Shuyi

[1]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf

On Sun, Nov 4, 2018 at 7:15 PM wenlong.lwl  wrote:

> Hi, Shuyi, thanks for the proposal.
>
> I have two concerns about the table ddl:
>
> 1. how about remove the source/sink mark from the ddl, because it is not
> necessary, the framework determine the table referred is a source or a sink
> according to the context of the query using the table. it will be more
> convenient for use defining a table which can be both a source and sink,
> and more convenient for catalog to persistent and manage the meta infos.
>
> 2. how about just keeping one pure string map as parameters for table, like
> create tabe Kafka10SourceTable (
> intField INTEGER,
> stringField VARCHAR(128),
> longField BIGINT,
> rowTimeField TIMESTAMP
> ) with (
> connector.type = ’kafka’,
> connector.property-version = ’1’,
> connector.version = ’0.10’,
> connector.properties.topic = ‘test-kafka-topic’,
> connector.properties.startup-mode = ‘latest-offset’,
> connector.properties.specific-offset = ‘offset’,
> format.type = 'json'
> format.prperties.version=’1’,
> format.derive-schema = 'true'
> );
> Because:
> 1. in TableFactory, what user use is a string map properties, defining
> parameters by string-map can be the closest way to mapping how user use the
> parameters.
> 2. The table descriptor can be extended by user, like what is done in Kafka
> and Json, it means that the parameter keys in connector or format can be
> different in different implementation, we can not restrict the key in a
> specified set, so we need a map in connector scope and a map in
> connector.properties scope. why not just give user a single map, let them
> put parameters in a format they like, which is also the simplest way to
> implement DDL parser.
> 3. whether we can define a format clause or not, depends on the
> implementation of the connector, using different clause in DDL may make a
> misunderstanding that we can combine the connectors with arbitrary formats,
> which may not work actually.
>
> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:
>
> > +1,  Thanks for the proposal.
> >
> > I guess this is a long-awaited change. This can vastly increase the
> > functionalities of the SQL Client as it will be possible to use complex
> > extensions like for example those provided by Apache Bahir[1].
> >
> > Best Regards,
> > Dom.
> >
> > [1]
> > https://github.com/apache/bahir-flink
> >
> > sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):
> >
> > > +1. Thanks for putting the proposal together Shuyi.
> > >
> > > DDL has been brought up in a couple of times previously [1,2].
> Utilizing
> > > DDL will definitely be a great extension to the current Flink SQL to
> > > systematically support some of the previously brought up features such
> as
> > > [3]. And it will also be beneficial to see the document closely aligned
> > > with the previous discussion for unified SQL connector API [4].
> > >
> > > I also left a few comments on the doc. Looking forward to the alignment
> > > with the other couple of efforts and contributing to them!
> > >
> > > Best,
> > > Rong
> > >
> > > [1]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > > [2]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> > >
> > > [3] https://issues.apache.org/jira/browse/FLINK-8003
> > > [4]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201

Re: [DISCUSS] Flink SQL DDL Design

2018-11-26 Thread Zhang, Xuefu
Hi Shuyi, 

I'm wondering if you folks still have the bandwidth working on this. 

We have some dedicated resource and like to move this forward. We can 
collaborate. 

Thanks, 

Xuefu 


--
发件人:wenlong.lwl
日 期:2018年11月05日 11:15:35
收件人:
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the ddl, because it is not
necessary, the framework determine the table referred is a source or a sink
according to the context of the query using the table. it will be more
convenient for use defining a table which can be both a source and sink,
and more convenient for catalog to persistent and manage the meta infos.

2. how about just keeping one pure string map as parameters for table, like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode = ‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map properties, defining
parameters by string-map can be the closest way to mapping how user use the
parameters.
2. The table descriptor can be extended by user, like what is done in Kafka
and Json, it means that the parameter keys in connector or format can be
different in different implementation, we can not restrict the key in a
specified set, so we need a map in connector scope and a map in
connector.properties scope. why not just give user a single map, let them
put parameters in a format they like, which is also the simplest way to
implement DDL parser.
3. whether we can define a format clause or not, depends on the
implementation of the connector, using different clause in DDL may make a
misunderstanding that we can combine the connectors with arbitrary formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:

> +1, Thanks for the proposal.
>
> I guess this is a long-awaited change. This can vastly increase the
> functionalities of the SQL Client as it will be possible to use complex
> extensions like for example those provided by Apache Bahir[1].
>
> Best Regards,
> Dom.
>
> [1]
> https://github.com/apache/bahir-flink
>
> sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):
>
> > +1. Thanks for putting the proposal together Shuyi.
> >
> > DDL has been brought up in a couple of times previously [1,2]. Utilizing
> > DDL will definitely be a great extension to the current Flink SQL to
> > systematically support some of the previously brought up features such as
> > [3]. And it will also be beneficial to see the document closely aligned
> > with the previous discussion for unified SQL connector API [4].
> >
> > I also left a few comments on the doc. Looking forward to the alignment
> > with the other couple of efforts and contributing to them!
> >
> > Best,
> > Rong
> >
> > [1]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > [2]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> >
> > [3] https://issues.apache.org/jira/browse/FLINK-8003
> > [4]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
> >
> >
> > On Fri, Nov 2, 2018 at 10:22 AM Bowen Li  wrote:
> >
> > > Thanks Shuyi!
> > >
> > > I left some comments there. I think the design of SQL DDL and
> Flink-Hive
> > > integration/External catalog enhancements will work closely with each
> > > other. Hope we are well aligned on the directions of the two designs,
> > and I
> > > look forward to working with you guys on both!
> > >
> > > Bowen
> > >
> > >
> > > On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > SQL DDL support has been a long-time ask from the community. Current
> > > Flink
> > > > SQL support only DML (e.g. SELECT and INSERT statements). In its
> > current
> > > > form, Flink SQL users still need to define/create table sources and
> > sinks
> > > > programmatically in Java/Scala. Also, in SQL Client, without DDL
&g

Re: [DISCUSS] Flink SQL DDL Design

2018-11-04 Thread wenlong.lwl
Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the ddl, because it is not
necessary, the framework determine the table referred is a source or a sink
according to the context of the query using the table. it will be more
convenient for use defining a table which can be both a source and sink,
and more convenient for catalog to persistent and manage the meta infos.

2. how about just keeping one pure string map as parameters for table, like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode = ‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map properties, defining
parameters by string-map can be the closest way to mapping how user use the
parameters.
2. The table descriptor can be extended by user, like what is done in Kafka
and Json, it means that the parameter keys in connector or format can be
different in different implementation, we can not restrict the key in a
specified set, so we need a map in connector scope and a map in
connector.properties scope. why not just give user a single map, let them
put parameters in a format they like, which is also the simplest way to
implement DDL parser.
3. whether we can define a format clause or not, depends on the
implementation of the connector, using different clause in DDL may make a
misunderstanding that we can combine the connectors with arbitrary formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński  wrote:

> +1,  Thanks for the proposal.
>
> I guess this is a long-awaited change. This can vastly increase the
> functionalities of the SQL Client as it will be possible to use complex
> extensions like for example those provided by Apache Bahir[1].
>
> Best Regards,
> Dom.
>
> [1]
> https://github.com/apache/bahir-flink
>
> sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):
>
> > +1. Thanks for putting the proposal together Shuyi.
> >
> > DDL has been brought up in a couple of times previously [1,2]. Utilizing
> > DDL will definitely be a great extension to the current Flink SQL to
> > systematically support some of the previously brought up features such as
> > [3]. And it will also be beneficial to see the document closely aligned
> > with the previous discussion for unified SQL connector API [4].
> >
> > I also left a few comments on the doc. Looking forward to the alignment
> > with the other couple of efforts and contributing to them!
> >
> > Best,
> > Rong
> >
> > [1]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > [2]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> >
> > [3] https://issues.apache.org/jira/browse/FLINK-8003
> > [4]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
> >
> >
> > On Fri, Nov 2, 2018 at 10:22 AM Bowen Li  wrote:
> >
> > > Thanks Shuyi!
> > >
> > > I left some comments there. I think the design of SQL DDL and
> Flink-Hive
> > > integration/External catalog enhancements will work closely with each
> > > other. Hope we are well aligned on the directions of the two designs,
> > and I
> > > look forward to working with you guys on both!
> > >
> > > Bowen
> > >
> > >
> > > On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > SQL DDL support has been a long-time ask from the community. Current
> > > Flink
> > > > SQL support only DML (e.g. SELECT and INSERT statements). In its
> > current
> > > > form, Flink SQL users still need to define/create table sources and
> > sinks
> > > > programmatically in Java/Scala. Also, in SQL Client, without DDL
> > support,
> > > > the current implementation does not allow dynamical creation of
> table,
> > > type
> > > > or functions with SQL, this adds friction for its adoption.
> > > >
> > > > I drafted a design doc [1] with a few other community members that
> > > proposes
> > > > the design and implementation for adding DDL support in Flink. The
> > > initial
> > > > design considers DDL for table, view, type, library and function. It
> > will
> > > > be great to get feedback on the design from the community, and align
> > with
> > > > latest effort in unified SQL connector API  [2] and Flink Hive
> > > integration
> > > > [3].
> > > >
> > > > Any feedback is highly appreciated.
> > > >
> > > > Thanks
> > > > Shuyi Chen
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://docs.google.com/docum

Re: [DISCUSS] Flink SQL DDL Design

2018-11-04 Thread Dominik Wosiński
+1,  Thanks for the proposal.

I guess this is a long-awaited change. This can vastly increase the
functionalities of the SQL Client as it will be possible to use complex
extensions like for example those provided by Apache Bahir[1].

Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong  napisał(a):

> +1. Thanks for putting the proposal together Shuyi.
>
> DDL has been brought up in a couple of times previously [1,2]. Utilizing
> DDL will definitely be a great extension to the current Flink SQL to
> systematically support some of the previously brought up features such as
> [3]. And it will also be beneficial to see the document closely aligned
> with the previous discussion for unified SQL connector API [4].
>
> I also left a few comments on the doc. Looking forward to the alignment
> with the other couple of efforts and contributing to them!
>
> Best,
> Rong
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> [2]
>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
>
> [3] https://issues.apache.org/jira/browse/FLINK-8003
> [4]
>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
>
>
> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li  wrote:
>
> > Thanks Shuyi!
> >
> > I left some comments there. I think the design of SQL DDL and Flink-Hive
> > integration/External catalog enhancements will work closely with each
> > other. Hope we are well aligned on the directions of the two designs,
> and I
> > look forward to working with you guys on both!
> >
> > Bowen
> >
> >
> > On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen  wrote:
> >
> > > Hi everyone,
> > >
> > > SQL DDL support has been a long-time ask from the community. Current
> > Flink
> > > SQL support only DML (e.g. SELECT and INSERT statements). In its
> current
> > > form, Flink SQL users still need to define/create table sources and
> sinks
> > > programmatically in Java/Scala. Also, in SQL Client, without DDL
> support,
> > > the current implementation does not allow dynamical creation of table,
> > type
> > > or functions with SQL, this adds friction for its adoption.
> > >
> > > I drafted a design doc [1] with a few other community members that
> > proposes
> > > the design and implementation for adding DDL support in Flink. The
> > initial
> > > design considers DDL for table, view, type, library and function. It
> will
> > > be great to get feedback on the design from the community, and align
> with
> > > latest effort in unified SQL connector API  [2] and Flink Hive
> > integration
> > > [3].
> > >
> > > Any feedback is highly appreciated.
> > >
> > > Thanks
> > > Shuyi Chen
> > >
> > > [1]
> > >
> > >
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> > > [2]
> > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > [3]
> > >
> > >
> >
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> > > --
> > > "So you have to trust that the dots will somehow connect in your
> future."
> > >
> >
>


Re: [DISCUSS] Flink SQL DDL Design

2018-11-03 Thread Rong Rong
+1. Thanks for putting the proposal together Shuyi.

DDL has been brought up in a couple of times previously [1,2]. Utilizing
DDL will definitely be a great extension to the current Flink SQL to
systematically support some of the previously brought up features such as
[3]. And it will also be beneficial to see the document closely aligned
with the previous discussion for unified SQL connector API [4].

I also left a few comments on the doc. Looking forward to the alignment
with the other couple of efforts and contributing to them!

Best,
Rong

[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
[2]
http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E

[3] https://issues.apache.org/jira/browse/FLINK-8003
[4]
http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E


On Fri, Nov 2, 2018 at 10:22 AM Bowen Li  wrote:

> Thanks Shuyi!
>
> I left some comments there. I think the design of SQL DDL and Flink-Hive
> integration/External catalog enhancements will work closely with each
> other. Hope we are well aligned on the directions of the two designs, and I
> look forward to working with you guys on both!
>
> Bowen
>
>
> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen  wrote:
>
> > Hi everyone,
> >
> > SQL DDL support has been a long-time ask from the community. Current
> Flink
> > SQL support only DML (e.g. SELECT and INSERT statements). In its current
> > form, Flink SQL users still need to define/create table sources and sinks
> > programmatically in Java/Scala. Also, in SQL Client, without DDL support,
> > the current implementation does not allow dynamical creation of table,
> type
> > or functions with SQL, this adds friction for its adoption.
> >
> > I drafted a design doc [1] with a few other community members that
> proposes
> > the design and implementation for adding DDL support in Flink. The
> initial
> > design considers DDL for table, view, type, library and function. It will
> > be great to get feedback on the design from the community, and align with
> > latest effort in unified SQL connector API  [2] and Flink Hive
> integration
> > [3].
> >
> > Any feedback is highly appreciated.
> >
> > Thanks
> > Shuyi Chen
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> > [2]
> >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > [3]
> >
> >
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>


Re: [DISCUSS] Flink SQL DDL Design

2018-11-02 Thread Bowen Li
Thanks Shuyi!

I left some comments there. I think the design of SQL DDL and Flink-Hive
integration/External catalog enhancements will work closely with each
other. Hope we are well aligned on the directions of the two designs, and I
look forward to working with you guys on both!

Bowen


On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen  wrote:

> Hi everyone,
>
> SQL DDL support has been a long-time ask from the community. Current Flink
> SQL support only DML (e.g. SELECT and INSERT statements). In its current
> form, Flink SQL users still need to define/create table sources and sinks
> programmatically in Java/Scala. Also, in SQL Client, without DDL support,
> the current implementation does not allow dynamical creation of table, type
> or functions with SQL, this adds friction for its adoption.
>
> I drafted a design doc [1] with a few other community members that proposes
> the design and implementation for adding DDL support in Flink. The initial
> design considers DDL for table, view, type, library and function. It will
> be great to get feedback on the design from the community, and align with
> latest effort in unified SQL connector API  [2] and Flink Hive integration
> [3].
>
> Any feedback is highly appreciated.
>
> Thanks
> Shuyi Chen
>
> [1]
>
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> [2]
>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> [3]
>
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> --
> "So you have to trust that the dots will somehow connect in your future."
>


[DISCUSS] Flink SQL DDL Design

2018-11-01 Thread Shuyi Chen
Hi everyone,

SQL DDL support has been a long-time ask from the community. Current Flink
SQL support only DML (e.g. SELECT and INSERT statements). In its current
form, Flink SQL users still need to define/create table sources and sinks
programmatically in Java/Scala. Also, in SQL Client, without DDL support,
the current implementation does not allow dynamical creation of table, type
or functions with SQL, this adds friction for its adoption.

I drafted a design doc [1] with a few other community members that proposes
the design and implementation for adding DDL support in Flink. The initial
design considers DDL for table, view, type, library and function. It will
be great to get feedback on the design from the community, and align with
latest effort in unified SQL connector API  [2] and Flink Hive integration
[3].

Any feedback is highly appreciated.

Thanks
Shuyi Chen

[1]
https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
[2]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
[3]
https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
-- 
"So you have to trust that the dots will somehow connect in your future."