hi all,
Thanks for your valuable input!

4) Event-Time Attributes and Watermarks:
4.b) @Fabian As you mentioned using a computed columns `ts AS
SYSTEMROWTIME()`
for writing out to kafka table sink will violate the rule that computed
fields are not emitted.
Since the timestamp column in kafka's header area is a specific
materialization protocol,
why don't we treat it as an connector property? For an example:
```
CREATE TABLE output_kafka_t1(
  id bigint,
  ts timestamp,
  msg varchar
) WITH (
  type=kafka,
  header.timestamp=ts
  ,...
);
```

4d) For custom watermark strategies
@Fabian Agree with you that opening another topic about this feature later.

3) SOURCE / SINK / BOTH
I think the permissions and availabilities are two separately things,
permissions
can be managed well by using GRANT/INVOKE(you can call it DCL) solutions
which
commonly used in different DBs. The permission part can be an new topic for
later discussion, what do you think?

For the availabilities, @Fabian @Timo  I've another question,
does instantiate a TableSource/Sink cost much or has some other downsides?
IMO, create a new source/sink object via the construct seems not costly.
When receiving a DDL we should associate it with the catalog object
(reusing an existence or create a new one).
Am I lost something important?

5. Schema declaration:
@Timo  yes, your concern about the user convenience is very important. But
I haven't seen a clear way to solve this so far.
Do we put it later and wait for more inputs from the community?

Shuyi Chen <suez1...@gmail.com> 于2018年12月8日周六 下午4:27写道:

> Hi all,
>
> Thanks a lot for the great discussion. I think we can continue the
> discussion here while carving out a MVP so that the community can start
> working on. Based on the discussion so far, I try to summarize what we will
> do for the MVP:
>
> MVP
>
>    1. support CREATE TABLE
>    2. support exisiting data type in Flink SQL, ignore nullability and
>    precision
>    3. support table comments and column comments
>    4. support table constraint PRIMARY KEY and UNIQUE
>    5. support table properties using key-value pairs
>    6. support partitioned by
>    7. support computed column
>    8. support from-field and from-source timestamp extractors
>    9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE watermark
>    strategies.
>    10. support a table property to allow explicit enforcement of
>    read/write(source/sink) permission of a table
>
> I try to put up the DDL grammar (
>
> https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing
> )
> based on the MVP features above and the previous design docs. Please take a
> look and comment on it.
>
>
> Also, I summarize the future Improvement on CREATE TABLE as the followings:
>
>    1. support table update mode
>    2. support data type nullability and precision
>    3. support row/map/array data type
>    4. support custom timestamp extractor and watermark strategy
>    5. support schema derivation
>    6. support system versioned temporal table
>    7. support table index
>
> I suggest we first agree on the MVP feature list and the MVP grammar. And
> then we can either continue the discussion of the future improvements here,
> or create separate JIRAs for each item and discuss further in the JIRA.
> What do you guys think?
>
> Shuyi
>
> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twal...@apache.org> wrote:
>
> > Hi all,
> >
> > I think we are making good progress. Thanks for all the feedback so far.
> >
> > 3. Sources/Sinks:
> > It seems that I can not find supporters for explicit SOURCE/SINK
> > declaration so I'm fine with not using those keywords.
> > @Fabian: Maybe we don't haven have to change the TableFactory interface
> > but just provide some helper functions in the TableFactoryService. This
> > would solve the availability problem, but the permission problem would
> > still not be solved. If you are fine with it, we could introduce a
> > property instead?
> >
> > 5. Schema declaration:
> > @Lin: We should find an agreement on this as it requires changes to the
> > TableFactory interface. We should minimize changes to this interface
> > because it is user-facing. Especially, if format schema and table schema
> > differ, the need for such a functionality is very important. Our goal is
> > to connect to existing infrastructure. For example, if we are using Avro
> > and the existing Avro format has enums but Flink SQL does not support
> > enums, it would be helpful to let the Avro format derive a table schema.
> > Otherwise your need to declare both schemas which leads to CREATE TABLE
> > statements of 400 lines+.
> > I think the mentioned query:
> > CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > format.schema-file = "/my/avrofile.avsc")
> > is fine and should only be valid if the schema contains no non-computed
> > columns.
> >
> > 7. Table Update Mode:
> > After thinking about it again, I agree. The mode of the sinks can be
> > derived from the query and the existence of a PRIMARY KEY declaration.
> > But Fabian raised a very good point. How do we deal with sources? Shall
> > we introduce a new keywords similar to WATERMARKS such that a
> > upsert/retract flag is not part of the visible schema?
> >
> > 4a. How to mark a field as attribute?
> > @Jark: Thanks for the explanation of the WATERMARK clause semantics.
> > This is a nice way of marking existing fields. This sounds good to me.
> >
> > 4c) WATERMARK as constraint
> > I'm fine with leaving the WATERMARK clause in the schema definition.
> >
> > 4d) Custom watermark strategies:
> > I would already think about custom watermark strategies as the current
> > descriptor design already supports this. ScalarFunction's don't work as
> > a PeriodicWatermarkAssigner has different semantics. Why not simply
> > entering the a full class name here as it is done in the current design?
> >
> > 4.b) Ingesting and writing timestamps to systems (like Kafka)
> > @Fabian: Yes, your suggestion sounds good to me. This behavior would be
> > similar to our current `timestamps: from-source` design.
> >
> > Once our discussion has found a conclusion, I would like to volunteer
> > and summarize the outcome of this mailing thread. It nicely aligns with
> > the update work on the connector improvements document (that I wanted to
> > do anyway) and the ongoing external catalog discussion. Furthermore, I
> > would also want to propose how to change existing interfaces by keeping
> > the DDL, connector improvements, and external catalog support in mind.
> > Would that be ok for you?
> >
> > Thanks,
> > Timo
> >
> >
> >
> > Am 07.12.18 um 14:48 schrieb Fabian Hueske:
> > > Hi all,
> > >
> > > Thanks for the discussion.
> > > I'd like to share my point of view as well.
> > >
> > > 4) Event-Time Attributes and Watermarks:
> > > 4.a) I agree with Lin and Jark's proposal. Declaring a watermark on an
> > > attribute declares it as an event-time attribute.
> > > 4.b) Ingesting and writing timestamps to systems (like Kafka). We could
> > use
> > > a special function like (ts AS SYSTEMROWTIME()). This function will
> > > indicate that we read the timestamp directly from the system (and not
> the
> > > data). We can also write the field back to the system when emitting the
> > > table (violating the rule that computed fields are not emitted).
> > > 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY
> > > constraint and therefore keep it in the schema definition.
> > > 4d) For custom watermark strategies, a simple expressions or
> > > ScalarFunctions won't be sufficient. Sophisticated approaches could
> > collect
> > > histograms, etc. But I think we can leave that out for later.
> > >
> > > 3) SOURCE / SINK / BOTH
> > > As you said, there are two things to consider here: permission and
> > > availability of a TableSource/TableSink.
> > > I think that neither should be a reason to add a keyword at such a
> > > sensitive position.
> > > However, I also see Timo's point that it would be good to know up-front
> > how
> > > a table can be used without trying to instantiate a TableSource/Sink
> for
> > a
> > > query.
> > > Maybe we can extend the TableFactory such that it provides information
> > > about which sources/sinks it can provide.
> > >
> > > 7. Table Update Mode
> > > Something that we definitely need to consider is how tables are
> ingested,
> > > i.e., append, retract or upsert.
> > > Especially, since upsert and retraction need a meta-data column that
> > > indicates whether an event is an insert (or upsert) or a delete change.
> > > This column needs to be identified somehow, most likely as part of the
> > > input format. Ideally, this column should not be part of the table
> schema
> > > (as it would be always true).
> > > Emitting tables is not so much of an issue as the properties of the
> table
> > > tell use what to do (append-only/update, unique key y/n).
> > >
> > > Best,
> > > Fabian
> > >
> > >
> > > Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <imj...@gmail.com>:
> > >
> > >> Hi Timo,
> > >>
> > >> Thanks for your quickly feedback! Here are some of my thoughts:
> > >>
> > >> Append, upserts, retract mode on sinks is also a very complex
> problem. I
> > >> think append/upserts/retract is the ability of a table, user do not
> > need to
> > >> specify a table is used for append or retraction or upsert. The query
> > can
> > >> choose which mode the sink is. If an unbounded groupby is inserted
> into
> > an
> > >> append sink (the sink only implements/supports append), an exception
> > can be
> > >> thrown. A more complex problem is, if we want to write
> > retractions/upserts
> > >> to Kafka, how to encode the change flag (add or retract/delete) on the
> > >> table? Maybe we should propose some protocal for the change flag
> > encoding,
> > >> but I don't have a clear idea about this right now.
> > >>
> > >> 3. Sources/Sinks: The source/sink tag is similar to the
> > >> append/upsert/retract problem. Besides source/sink, actully we have
> > stream
> > >> source, stream sink, batch source, batch sink, and the stream sink
> also
> > >> include append/upsert/retract three modes. Should we put all the tags
> on
> > >> the CREATE TABLE? IMO, the table's ability is defined by the table
> > itself,
> > >> user do not need to specify it. If it is only a readable table, an
> > >> exception can be thrown when write to it. As the source/sink tag can
> be
> > >> omitted in CREATE TABLE, could we skip it and only support CREATE
> TABLE
> > in
> > >> the first version, and add it back in the future when we really need
> > it? It
> > >> keeps API compatible and make sure the MVP is what we consider
> clearly.
> > >>
> > >> 4a. How to mark a field as attribute?
> > >> The watermark definition includes two parts: use which field as time
> > >> attribute and use what generate strategy.
> > >> When we want to mark `ts` field as attribute: WATERMARK FOR `ts` AS
> > OFFSET
> > >> '5' SECOND.
> > >> If we have a POJO{id, user, ts} field named "pojo", we can mark it
> like
> > >> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND
> > >>
> > >> 4b. timestamp write to Kafka message header
> > >> Even though we can define multiple time attribute on a table, only one
> > time
> > >> attribute can be actived/used in a query (in a stream). When we enable
> > >> `writeTiemstamp`, the only attribute actived in the stream will be
> > write to
> > >> Kafka message header. What I mean the timestmap in StreamRecord is the
> > time
> > >> attribute in the stream.
> > >>
> > >> 4c. Yes. We introduced the WATERMARK keyword similar to the INDEX,
> > PRIMARY
> > >> KEY keywords.
> > >>
> > >> @Timo, Do you have any other advice or questions on the watermark
> > syntax ?
> > >> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
> > "OFFSET"
> > >> VS ...
> > >>
> > >>
> > >> Cheers,
> > >> Jark
> > >>
> > >> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.8...@gmail.com> wrote:
> > >>
> > >>> Hi Timo,
> > >>> Thanks for your feedback, here's some thoughts of mine:
> > >>>
> > >>> 3. Sources/Sinks:
> > >>> "Let's assume an interactive CLI session, people should be able to
> list
> > >> all
> > >>> source table and sink tables to know upfront if they can use an
> INSERT
> > >> INTO
> > >>> here or not."
> > >>> This requirement can be simply resolved by a document that list all
> > >>> supported source/sink/both connectors and the sql-client can perform
> a
> > >>> quick check. It's only an implementation choice, not necessary for
> the
> > >>> syntax.
> > >>> For connector implementation, a connector may implement one or some
> or
> > >> all
> > >>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can derive
> > the
> > >>> availability for any give query without the SOURCE/SINk keywords or
> > >>> specific table properties in WITH clause.
> > >>> Since there's still indeterminacy, shall we skip these two keywords
> for
> > >> the
> > >>> MVP DDL? We can make further discussion after users' feedback.
> > >>>
> > >>> 6. Partitioning and keys
> > >>> Agree with you that raise the priority of table constraint and
> > >> partitioned
> > >>> table support for better connectivity to Hive and Kafka. I'll add
> > >>> partitioned table syntax(compatible to hive) into the DDL Draft doc
> > >>> later[1].
> > >>>
> > >>> 5. Schema declaration
> > >>> "if users want to declare computed columns they have a "schema"
> > >> constraints
> > >>> but without columns
> > >>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > >>> format.schema-file = "/my/avrofile.avsc") "
> > >>>
> > >>>  From the point of my view, this ddl is invalid because the primary
> key
> > >>> constraint already references two columns but types unseen.
> > >>> And Xuefu pointed a important matching problem, so let's put schema
> > >>> derivation as a follow-up extension ?
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> Timo Walther <twal...@apache.org> 于2018年12月6日周四 下午6:05写道:
> > >>>
> > >>>> Hi everyone,
> > >>>>
> > >>>> great to have such a lively discussion. My next batch of feedback:
> > >>>>
> > >>>> @Jark: We don't need to align the descriptor approach with SQL. I'm
> > >> open
> > >>>> for different approaches as long as we can serve a broad set of use
> > >>>> cases and systems. The descriptor approach was a first attempt to
> > cover
> > >>>> all aspects and connector/format characteristics. Just another
> > example,
> > >>>> that is missing in the DDL design: How can a user decide if append,
> > >>>> retraction, or upserts should be used to sink data into the target
> > >>>> system? Do we want to define all these improtant properties in the
> big
> > >>>> WITH property map? If yes, we are already close to the descriptor
> > >>>> approach. Regarding the "standard way", most DDL languages have very
> > >>>> custom syntax so there is not a real "standard".
> > >>>>
> > >>>> 3. Sources/Sinks: @Lin: If a table has both read/write access it can
> > be
> > >>>> created using a regular CREATE TABLE (omitting a specific
> source/sink)
> > >>>> declaration. Regarding the transition from source/sink to both, yes
> we
> > >>>> would need to update the a DDL and catalogs. But is this a problem?
> > One
> > >>>> also needs to add new queries that use the tables. @Xuefu: It is not
> > >>>> only about security aspects. Especially for streaming use cases, not
> > >>>> every connector can be used as a source easily. For example, a JDBC
> > >> sink
> > >>>> is easier than a JDBC source. Let's assume an interactive CLI
> session,
> > >>>> people should be able to list all source table and sink tables to
> know
> > >>>> upfront if they can use an INSERT INTO here or not.
> > >>>>
> > >>>> 6. Partitioning and keys: @Lin: I would like to include this in the
> > >>>> design given that Hive integration and Kafka key support are in the
> > >>>> making/are on our roadmap for this release.
> > >>>>
> > >>>> 5. Schema declaration: @Lin: You are right it is not conflicting. I
> > >> just
> > >>>> wanted to raise the point because if users want to declare computed
> > >>>> columns they have a "schema" constraints but without columns. Are we
> > ok
> > >>>> with a syntax like ...
> > >>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > >>>> format.schema-file = "/my/avrofile.avsc") ?
> > >>>> @Xuefu: Yes, you are right that an external schema might not excatly
> > >>>> match but this is true for both directions:
> > >>>> table schema "derives" format schema and format schema "derives"
> table
> > >>>> schema.
> > >>>>
> > >>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but we
> > >>>> should not just adopt everything from Hive as there syntax is very
> > >>>> batch-specific. We should come up with a superset of historical and
> > >>>> future requirements. Supporting Hive queries can be an intermediate
> > >>>> layer on top of Flink's DDL.
> > >>>>
> > >>>> 4. Time attributes: @Lin: I'm fine with changing the
> > TimestampExtractor
> > >>>> interface as this is also important for better separation of
> connector
> > >>>> and table module [1]. However, I'm wondering about watermark
> > >> generation.
> > >>>> 4a. timestamps are in the schema twice:
> > >>>> @Jark: "existing field is Long/Timestamp, we can just use it as
> > >>>> rowtime": yes, but we need to mark a field as such an attribute. How
> > >>>> does the syntax for marking look like? Also in case of timestamps
> that
> > >>>> are nested in the schema?
> > >>>>
> > >>>> 4b. how can we write out a timestamp into the message header?:
> > >>>> I agree to simply ignore computed columns when writing out. This is
> > >> like
> > >>>> 'field-change: add' that I mentioned in the improvements document.
> > >>>> @Jark: "then the timestmap in StreamRecord will be write to Kafka
> > >>>> message header": Unfortunately, there is no timestamp in the stream
> > >>>> record. Additionally, multiple time attributes can be in a schema.
> So
> > >> we
> > >>>> need a constraint that tells the sink which column to use (possibly
> > >>>> computed as well)?
> > >>>>
> > >>>> 4c. separate all time attribute concerns into a special clause next
> to
> > >>>> the regular schema?
> > >>>> @Jark: I don't have a strong opinion on this. I just have the
> feeling
> > >>>> that the "schema part" becomes quite messy because the actual schema
> > >>>> with types and fields is accompanied by so much metadata about
> > >>>> timestamps, watermarks, keys,... and we would need to introduce a
> new
> > >>>> WATERMARK keyword within a schema that was close to standard up to
> > this
> > >>>> point.
> > >>>>
> > >>>> Thanks everyone,
> > >>>> Timo
> > >>>>
> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-9461
> > >>>>
> > >>>>
> > >>>>
> > >>>> Am 06.12.18 um 07:08 schrieb Jark Wu:
> > >>>>> Hi Timo,
> > >>>>>
> > >>>>> Thank you for the valuable feedbacks.
> > >>>>>
> > >>>>> First of all, I think we don't need to align the SQL functionality
> to
> > >>>>> Descriptor. Because SQL is a more standard API, we should be as
> > >>> cautious
> > >>>> as
> > >>>>> possible to extend the SQL syntax. If something can be done in a
> > >>> standard
> > >>>>> way, we shouldn't introduce something new.
> > >>>>>
> > >>>>> Here are some of my thoughts:
> > >>>>>
> > >>>>> 1. Scope: Agree.
> > >>>>> 2. Constraints: Agree.
> > >>>>> 4. Time attributes:
> > >>>>>     4a. timestamps are in the schema twice.
> > >>>>>      If an existing field is Long/Timestamp, we can just use it as
> > >>>> rowtime,
> > >>>>> no twice defined. If it is not a Long/Timestamp, we use computed
> > >> column
> > >>>> to
> > >>>>> get an expected timestamp column to be rowtime, is this what you
> mean
> > >>>>> defined twice?  But I don't think it is a problem, but an
> advantages,
> > >>>>> because it is easy to use, user do not need to consider whether to
> > >>>> "replace
> > >>>>> the existing column" or "add a new column", he will not be confused
> > >>>> what's
> > >>>>> the real schema is, what's the index of rowtime in the schema?
> > >>> Regarding
> > >>>> to
> > >>>>> the optimization, even if timestamps are in schema twice, when the
> > >>>> original
> > >>>>> timestamp is never used in query, then the projection pushdown
> > >>>> optimization
> > >>>>> can cut this field as early as possible, which is exactly the same
> as
> > >>>>> "replacing the existing column" in runtime.
> > >>>>>
> > >>>>>      4b. how can we write out a timestamp into the message header?
> > >>>>>       That's a good point. I think computed column is just a
> virtual
> > >>>> column
> > >>>>> on table which is only relative to reading. If we want to write to
> a
> > >>>> table
> > >>>>> with computed column defined, we only need to provide the columns
> > >>> except
> > >>>>> computed columns (see SQL Server [1]). The computed column is
> ignored
> > >>> in
> > >>>>> the insert statement. Get back to the question, how can we write
> out
> > >> a
> > >>>>> timestamp into the message header? IMO, we can provide a
> > >> configuration
> > >>> to
> > >>>>> support this, such as `kafka.writeTimestamp=true`, then the
> timestmap
> > >>> in
> > >>>>> StreamRecord will be write to Kafka message header. What do you
> > >> think?
> > >>>>>       4c. separate all time attribute concerns into a special
> clause
> > >>> next
> > >>>> to
> > >>>>> the regular schema?
> > >>>>>       Separating watermark into a special clause similar to
> > >> PARTITIONED
> > >>>> BY is
> > >>>>> also a good idea. Conceptually, it's fine to put watermark in
> schema
> > >>> part
> > >>>>> or out schema part. But if we want to support multiple watermark
> > >>>>> definition, maybe it would be better to put it in schema part. It
> is
> > >>>>> similar to Index Definition that we can define several indexes on a
> > >>> table
> > >>>>> in schema part.
> > >>>>>
> > >>>>>       4d. How can people come up with a custom watermark strategy?
> > >>>>>       In most cases, the built-in strategy can works good. If we
> need
> > >> a
> > >>>>> custom one, we can use a scalar function which restrict to only
> > >> return
> > >>> a
> > >>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime AS
> > >>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
> > >>>> function
> > >>>>> accepts 3 parameters and return a nullable Long which can be used
> as
> > >>>>> punctuated watermark assigner. Another choice is implementing a
> class
> > >>>>> extending the
> > >>>>> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and
> > >> use
> > >>>> it
> > >>>>> in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But
> if
> > >>>>> scalar function can cover the requirements here, I would prefer it
> > >>> here,
> > >>>>> because it keeps standard compliant. BTW, this feature is not in
> MVP,
> > >>> we
> > >>>>> can discuss it more depth in the future when we need it.
> > >>>>>
> > >>>>> 5. Schema declaration:
> > >>>>> I like the proposal to omit the schema if we can get the schema
> from
> > >>>>> external storage or something schema file. Actually, we have
> already
> > >>>>> encountered this requirement in out company.
> > >>>>>
> > >>>>>
> > >>>>> +1 to @Xuefu that we should be as close as possible to Hive syntax
> > >>> while
> > >>>>> keeping SQL ANSI standard. This will make it more acceptable and
> > >> reduce
> > >>>> the
> > >>>>> learning cost for user.
> > >>>>>
> > >>>>> [1]:
> > >>>>>
> > >>
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> > >>>>> Best,
> > >>>>> Jark
> > >>>>>
> > >>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <xuef...@alibaba-inc.com
> >
> > >>>> wrote:
> > >>>>>> Hi Timo/Shuyi/Lin,
> > >>>>>>
> > >>>>>> Thanks for the discussions. It seems that we are converging to
> > >>> something
> > >>>>>> meaningful. Here are some of my thoughts:
> > >>>>>>
> > >>>>>> 1. +1 on MVP DDL
> > >>>>>> 3. Markers for source or sink seem more about permissions on
> tables
> > >>> that
> > >>>>>> belong to a security component. Unless the table is created
> > >>> differently
> > >>>>>> based on source, sink, or both, it doesn't seem necessary to use
> > >> these
> > >>>>>> keywords to enforce permissions.
> > >>>>>> 5. It might be okay if schema declaration is always needed. While
> > >>> there
> > >>>>>> might be some duplication sometimes, it's not always true. For
> > >>> example,
> > >>>>>> external schema may not be exactly matching Flink schema. For
> > >>> instance,
> > >>>>>> data types. Even if so, perfect match is not required. For
> instance,
> > >>> the
> > >>>>>> external schema file may evolve while table schema in Flink may
> stay
> > >>>>>> unchanged. A responsible reader should be able to scan the file
> > >> based
> > >>> on
> > >>>>>> file schema and return the data based on table schema.
> > >>>>>>
> > >>>>>> Other aspects:
> > >>>>>>
> > >>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to
> operate
> > >> on
> > >>>>>> Hive metadata and data, it's an add-on benefit if we can be
> > >> compatible
> > >>>> with
> > >>>>>> Hive syntax/semantics while following ANSI standard. At least we
> > >>> should
> > >>>> be
> > >>>>>> as close as possible. Hive DDL can found at
> > >>>>>>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Xuefu
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> ------------------------------------------------------------------
> > >>>>>> Sender:Lin Li <lincoln.8...@gmail.com>
> > >>>>>> Sent at:2018 Dec 6 (Thu) 10:49
> > >>>>>> Recipient:dev <dev@flink.apache.org>
> > >>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design
> > >>>>>>
> > >>>>>> Hi Timo and Shuyi,
> > >>>>>>     thanks for your feedback.
> > >>>>>>
> > >>>>>> 1. Scope
> > >>>>>> agree with you we should focus on the MVP DDL first.
> > >>>>>>
> > >>>>>> 2. Constraints
> > >>>>>> yes, this can be a follow-up issue.
> > >>>>>>
> > >>>>>> 3. Sources/Sinks
> > >>>>>> If a TABLE has both read/write access requirements, should we
> > >> declare
> > >>> it
> > >>>>>> using
> > >>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
> > >> question,
> > >>>> if a
> > >>>>>> TABLE
> > >>>>>> t1 firstly declared as read only (as a source table), then for
> some
> > >>> new
> > >>>>>> requirements
> > >>>>>> t1 will change to a sink table,  in this case we need updating
> both
> > >>> the
> > >>>> DDL
> > >>>>>> and catalogs.
> > >>>>>> Further more, let's think about the BATCH query, update one table
> > >>>> in-place
> > >>>>>> can be a common case.
> > >>>>>> e.g.,
> > >>>>>> ```
> > >>>>>> CREATE TABLE t1 (
> > >>>>>>     col1 varchar,
> > >>>>>>     col2 int,
> > >>>>>>     col3 varchar
> > >>>>>>     ...
> > >>>>>> );
> > >>>>>>
> > >>>>>> INSERT [OVERWRITE] TABLE t1
> > >>>>>> AS
> > >>>>>> SELECT
> > >>>>>>     (some computing ...)
> > >>>>>> FROM t1;
> > >>>>>> ```
> > >>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the
> > >> validation
> > >>>>>> purpose, we can find out other ways.
> > >>>>>>
> > >>>>>> 4. Time attributes
> > >>>>>> As Shuyi mentioned before, there exists an
> > >>>>>> `org.apache.flink.table.sources.tsextractors.TimestampExtractor`
> for
> > >>>> custom
> > >>>>>> defined time attributes usage, but this expression based class is
> > >> more
> > >>>>>> friendly for table api not the SQL.
> > >>>>>> ```
> > >>>>>> /**
> > >>>>>>     * Provides the an expression to extract the timestamp for a
> > >> rowtime
> > >>>>>> attribute.
> > >>>>>>     */
> > >>>>>> abstract class TimestampExtractor extends FieldComputer[Long] with
> > >>>>>> Serializable {
> > >>>>>>
> > >>>>>>     /** Timestamp extractors compute the timestamp as Long. */
> > >>>>>>     override def getReturnType: TypeInformation[Long] =
> > >>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]]
> > >>>>>> }
> > >>>>>> ```
> > >>>>>> BTW, I think both the Scalar function and the TimestampExtractor
> are
> > >>>>>> expressing computing logic, the TimestampExtractor has no more
> > >>>> advantage in
> > >>>>>> SQL scenarios.
> > >>>>>>
> > >>>>>>
> > >>>>>> 6. Partitioning and keys
> > >>>>>> Primary Key is included in Constraint part, and partitioned table
> > >>>> support
> > >>>>>> can be another topic later.
> > >>>>>>
> > >>>>>> 5. Schema declaration
> > >>>>>> Agree with you that we can do better schema derivation for user
> > >>>>>> convenience, but this is not conflict with the syntax.
> > >>>>>> Table properties can carry any useful informations both for the
> > >> users
> > >>>> and
> > >>>>>> the framework, I like your `contract name` proposal,
> > >>>>>> e.g., `WITH (format.type = avro)`, the framework can recognize
> some
> > >>>>>> `contract name` like `format.type`, `connector.type` and etc.
> > >>>>>> And also derive the table schema from an existing schema file can
> be
> > >>>> handy
> > >>>>>> especially one with too many table columns.
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Lin
> > >>>>>>
> > >>>>>>
> > >>>>>> Timo Walther <twal...@apache.org> 于2018年12月5日周三 下午10:40写道:
> > >>>>>>
> > >>>>>>> Hi Jark and Shuyi,
> > >>>>>>>
> > >>>>>>> thanks for pushing the DDL efforts forward. I agree that we
> should
> > >>> aim
> > >>>>>>> to combine both Shuyi's design and your design.
> > >>>>>>>
> > >>>>>>> Here are a couple of concerns that I think we should address in
> the
> > >>>>>> design:
> > >>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements
> > >>> first.
> > >>>>>>> I think this topic has already enough potential for long
> > >> discussions
> > >>>> and
> > >>>>>>> is very helpful for users. We can discuss CREATE VIEW and CREATE
> > >>>>>>> FUNCTION afterwards as they are not related to each other.
> > >>>>>>>
> > >>>>>>> 2. Constraints: I think we should consider things like
> nullability,
> > >>>>>>> VARCHAR length, and decimal scale and precision in the future as
> > >> they
> > >>>>>>> allow for nice optimizations. However, since both the translation
> > >> and
> > >>>>>>> runtime operators do not support those features. I would not
> > >>> introduce
> > >>>> a
> > >>>>>>> arbitrary default value but omit those parameters for now. This
> can
> > >>> be
> > >>>> a
> > >>>>>>> follow-up issue once the basic DDL has been merged.
> > >>>>>>>
> > >>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs
> CREATE
> > >>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow for
> > >> these
> > >>>>>>> explicit declaration because in most production scenarios, teams
> > >> have
> > >>>>>>> strict read/write access requirements. For example, a data
> science
> > >>> team
> > >>>>>>> should only consume from a event Kafka topic but should not
> > >>> accidently
> > >>>>>>> write back to the single source of truth.
> > >>>>>>>
> > >>>>>>> 4. Time attributes: In general, I like your computed columns
> > >> approach
> > >>>>>>> because it makes defining a rowtime attributes transparent and
> > >>> simple.
> > >>>>>>> However, there are downsides that we should discuss.
> > >>>>>>> 4a. Jarks current design means that timestamps are in the schema
> > >>> twice.
> > >>>>>>> The design that is mentioned in [1] makes this more flexible as
> it
> > >>>>>>> either allows to replace an existing column or add a computed
> > >> column.
> > >>>>>>> 4b. We need to consider the zoo of storage systems that is out
> > >> there
> > >>>>>>> right now. Take Kafka as an example, how can we write out a
> > >> timestamp
> > >>>>>>> into the message header? We need to think of a reverse operation
> > >> to a
> > >>>>>>> computed column.
> > >>>>>>> 4c. Does defining a watermark really fit into the schema part of
> a
> > >>>>>>> table? Shouldn't we separate all time attribute concerns into a
> > >>> special
> > >>>>>>> clause next to the regular schema, similar how PARTITIONED BY
> does
> > >> it
> > >>>> in
> > >>>>>>> Hive?
> > >>>>>>> 4d. How can people come up with a custom watermark strategy? I
> > >> guess
> > >>>>>>> this can not be implemented in a scalar function and would
> require
> > >>> some
> > >>>>>>> new type of UDF?
> > >>>>>>>
> > >>>>>>> 6. Partitioning and keys: Another question that the DDL design
> > >> should
> > >>>>>>> answer is how do we express primary keys (for upserts),
> > >> partitioning
> > >>>>>>> keys (for Hive, Kafka message keys). All part of the table
> schema?
> > >>>>>>>
> > >>>>>>> 5. Schema declaration: I find it very annoying that we want to
> > >> force
> > >>>>>>> people to declare all columns and types again even though this is
> > >>>>>>> usually already defined in some company-wide format. I know that
> > >>>> catalog
> > >>>>>>> support will greatly improve this. But if no catalog is used,
> > >> people
> > >>>>>>> need to manually define a schema with 50+ fields in a Flink DDL.
> > >>> What I
> > >>>>>>> actually promoted having two ways of reading data:
> > >>>>>>>
> > >>>>>>> 1. Either the format derives its schema from the table schema.
> > >>>>>>> CREATE TABLE (col INT) WITH (format.type = avro)
> > >>>>>>>
> > >>>>>>> 2. Or the table schema can be omitted and the format schema
> defines
> > >>> the
> > >>>>>>> table schema (+ time attributes).
> > >>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file =
> > >>>>>>> "/my/avrofile.avsc")
> > >>>>>>>
> > >>>>>>> Please let me know what you think about each item. I will try to
> > >>>>>>> incorporate your feedback in [1] this week.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
> > >>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu:
> > >>>>>>>> Hi Shuyi,
> > >>>>>>>>
> > >>>>>>>> It's exciting to see we can make such a great progress here.
> > >>>>>>>>
> > >>>>>>>> Regarding to the watermark:
> > >>>>>>>>
> > >>>>>>>> Watermarks can be defined on any columns (including
> > >> computed-column)
> > >>>> in
> > >>>>>>>> table schema.
> > >>>>>>>> The computed column can be computed from existing columns using
> > >>>> builtin
> > >>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction).
> > >>>>>>>> So IMO, it can work out for almost all the scenarios not only
> > >> common
> > >>>>>>>> scenarios.
> > >>>>>>>>
> > >>>>>>>> I don't think using a `TimestampExtractor` to support custom
> > >>> timestamp
> > >>>>>>>> extractor in SQL is a good idea. Because `TimestampExtractor`
> > >>>>>>>> is not a SQL standard function. If we support
> `TimestampExtractor`
> > >>> in
> > >>>>>>> SQL,
> > >>>>>>>> do we need to support CREATE FUNCTION for `TimestampExtractor`?
> > >>>>>>>> I think `ScalarFunction` can do the same thing with
> > >>>>>> `TimestampExtractor`
> > >>>>>>>> but more powerful and standard.
> > >>>>>>>>
> > >>>>>>>> The core idea of the watermark definition syntax is that the
> > >> schema
> > >>>>>> part
> > >>>>>>>> defines all the columns of the table, it is exactly what the
> query
> > >>>>>> sees.
> > >>>>>>>> The watermark part is something like a primary key definition or
> > >>>>>>> constraint
> > >>>>>>>> on SQL Table, it has no side effect on the schema, only defines
> > >> what
> > >>>>>>>> watermark strategy is and makes which field as the rowtime
> > >> attribute
> > >>>>>>> field.
> > >>>>>>>> If the rowtime field is not in the existing fields, we can use
> > >>>> computed
> > >>>>>>>> column
> > >>>>>>>> to generate it from other existing fields. The Descriptor
> Pattern
> > >>> API
> > >>>>>> [1]
> > >>>>>>>> is very useful when writing a Table API job, but is not
> > >>> contradictory
> > >>>>>> to
> > >>>>>>>> the
> > >>>>>>>> Watermark DDL from my perspective.
> > >>>>>>>>
> > >>>>>>>> [1]:
> > >>>>>>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> > >>>>>>>> .
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Jark
> > >>>>>>>>
> > >>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1...@gmail.com>
> > >> wrote:
> > >>>>>>>>> Hi Jark and Shaoxuan,
> > >>>>>>>>>
> > >>>>>>>>> Thanks a lot for the summary. I think we are making great
> > >> progress
> > >>>>>> here.
> > >>>>>>>>> Below are my thoughts.
> > >>>>>>>>>
> > >>>>>>>>> *(1) watermark definition
> > >>>>>>>>> IMO, it's better to keep it consistent with the rowtime
> > >> extractors
> > >>>> and
> > >>>>>>>>> watermark strategies defined in
> > >>>>>>>>>
> > >>>>>>>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> > >>>>>>>>> .
> > >>>>>>>>> Using built-in functions seems to be too much for most of the
> > >>> common
> > >>>>>>>>> scenarios.
> > >>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> > >>>>>>>>> Actually, I think we can put the source/sink type info into the
> > >>> table
> > >>>>>>>>> properties, so we can use CREATE TABLE.
> > >>>>>>>>> (3) View DDL with properties
> > >>>>>>>>> We can remove the view properties section now for the MVP and
> add
> > >>> it
> > >>>>>>> back
> > >>>>>>>>> later if needed.
> > >>>>>>>>> (4) Type Definition
> > >>>>>>>>> I agree we can put the type length or precision into future
> > >>> versions.
> > >>>>>> As
> > >>>>>>>>> for the grammar difference, currently, I am using the grammar
> in
> > >>>>>> Calcite
> > >>>>>>>>> type DDL, but since we'll extend the parser in Flink, so we can
> > >>>>>>> definitely
> > >>>>>>>>> change if needed.
> > >>>>>>>>>
> > >>>>>>>>> Shuyi
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imj...@gmail.com>
> > >> wrote:
> > >>>>>>>>>> Hi Shaoxuan,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag on
> create
> > >>>>>> table
> > >>>>>>> is
> > >>>>>>>>>> the another major difference.
> > >>>>>>>>>>
> > >>>>>>>>>> Summarize the main differences again:
> > >>>>>>>>>>
> > >>>>>>>>>> *(1) watermark definition
> > >>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> > >>>>>>>>>> (3) View DDL with properties
> > >>>>>>>>>> (4) Type Definition
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Jark
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <
> wshaox...@gmail.com
> > >>>>>>> wrote:
> > >>>>>>>>>>> Hi Jark,
> > >>>>>>>>>>> Thanks for the summary. Your plan for the 1st round
> > >>> implementation
> > >>>>>> of
> > >>>>>>>>> DDL
> > >>>>>>>>>>> looks good to me.
> > >>>>>>>>>>> Have we reached the agreement on simplifying/unifying "create
> > >>>>>>>>>> [source/sink]
> > >>>>>>>>>>> table" to "create table"? "Watermark definition" and "create
> > >>> table"
> > >>>>>>> are
> > >>>>>>>>>> the
> > >>>>>>>>>>> major obstacles on the way to merge two design proposals
> FMPOV.
> > >>>>>>> @Shuyi,
> > >>>>>>>>>> It
> > >>>>>>>>>>> would be great if you can spend time and respond to these two
> > >>> parts
> > >>>>>>>>>> first.
> > >>>>>>>>>>> Regards,
> > >>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <imj...@gmail.com>
> > >>> wrote:
> > >>>>>>>>>>>> Hi Shuyi,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that Lin
> and I
> > >>>>>>>>> drafted.
> > >>>>>>>>>>>> This doc covers all the features running in Alibaba.
> > >>>>>>>>>>>> But some of features might be not needed in the first
> version
> > >> of
> > >>>>>>>>> Flink
> > >>>>>>>>>>> SQL
> > >>>>>>>>>>>> DDL.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and reach
> > >>>>>>>>> agreement
> > >>>>>>>>>>> ASAP
> > >>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2] Shuyi
> > >>> proposed.
> > >>>>>>>>>>>> And we can discuss on the main differences one by one.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The following is the MVP DDLs should be included in the
> first
> > >>>>>> version
> > >>>>>>>>>> in
> > >>>>>>>>>>> my
> > >>>>>>>>>>>> opinion (feedbacks are welcome):
> > >>>>>>>>>>>> (1) Table DDL:
> > >>>>>>>>>>>>        (1.1) Type definition
> > >>>>>>>>>>>>        (1.2) computed column definition
> > >>>>>>>>>>>>        (1.3) watermark definition
> > >>>>>>>>>>>>        (1.4) with properties
> > >>>>>>>>>>>>        (1.5) table constraint (primary key/unique)
> > >>>>>>>>>>>>        (1.6) column nullability (nice to have)
> > >>>>>>>>>>>> (2) View DDL
> > >>>>>>>>>>>> (3) Function DDL
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The main differences from two DDL docs (sth maybe missed,
> > >>> welcome
> > >>>>>> to
> > >>>>>>>>>>> point
> > >>>>>>>>>>>> out):
> > >>>>>>>>>>>> *(1.3) watermark*: this is the main and the most important
> > >>>>>>>>> difference,
> > >>>>>>>>>> it
> > >>>>>>>>>>>> would be great if @Timo Walther <twal...@apache.org>
> @Fabian
> > >>>>>> Hueske
> > >>>>>>>>>>>> <fhue...@gmail.com>  give some feedbacks.
> > >>>>>>>>>>>>     (1.1) Type definition:
> > >>>>>>>>>>>>          (a) Should VARCHAR carry a length, e.g.
> VARCHAR(128)
> > ?
> > >>>>>>>>>>>>               In most cases, the varchar length is not used
> > >>> because
> > >>>>>>> they
> > >>>>>>>>>> are
> > >>>>>>>>>>>> stored as String in Flink. But it can be used to optimize in
> > >> the
> > >>>>>>>>> future
> > >>>>>>>>>>> if
> > >>>>>>>>>>>> we know the column is a fixed length VARCHAR.
> > >>>>>>>>>>>>               So IMO, we can support VARCHAR with length in
> > the
> > >>>>>> future,
> > >>>>>>>>>> and
> > >>>>>>>>>>>> just VARCHAR in this version.
> > >>>>>>>>>>>>          (b) Should DECIMAL support custom scale and
> > precision,
> > >>>> e.g.
> > >>>>>>>>>>>> DECIMAL(12, 5)?
> > >>>>>>>>>>>>               If we clearly know the scale and precision of
> > the
> > >>>>>>> Decimal,
> > >>>>>>>>>> we
> > >>>>>>>>>>>> can have some optimization on serialization/deserialization.
> > >>> IMO,
> > >>>>>> we
> > >>>>>>>>>> can
> > >>>>>>>>>>>> support just support DECIMAL in this version,
> > >>>>>>>>>>>>               which means DECIMAL(38, 18) as default. And
> > >> support
> > >>>>>>> custom
> > >>>>>>>>>>> scale
> > >>>>>>>>>>>> and precision in the future.
> > >>>>>>>>>>>>     (2) View DDL: Do we need WITH properties in View DDL
> > >>> (proposed
> > >>>> in
> > >>>>>>>>>>> doc[2])?
> > >>>>>>>>>>>> What are the properties on the view used for?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The features could be supported and discussed in the future:
> > >>>>>>>>>>>> (1) period definition on table
> > >>>>>>>>>>>> (2) Type DDL
> > >>>>>>>>>>>> (3) Index DDL
> > >>>>>>>>>>>> (4) Library DDL
> > >>>>>>>>>>>> (5) Drop statement
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> [1] Flink DDL draft by Lin and Jark:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
> > >>>>>>>>>>>> [2] Flink SQL DDL design by Shuyi:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>> Jark
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <
> > >>> wshaox...@gmail.com>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>> Sure Shuyu,
> > >>>>>>>>>>>>> What I hope is that we can reach an agreement on DDL gramma
> > >> as
> > >>>>>> soon
> > >>>>>>>>>> as
> > >>>>>>>>>>>>> possible. There are a few differences between your proposal
> > >> and
> > >>>>>>>>> ours.
> > >>>>>>>>>>>> Once
> > >>>>>>>>>>>>> Lin and Jark propose our design, we can quickly discuss on
> > >> the
> > >>>>>>>>> those
> > >>>>>>>>>>>>> differences, and see how far away towards a unified design.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> WRT the external catalog, I think it is an orthogonal
> topic,
> > >> we
> > >>>>>> can
> > >>>>>>>>>>>> design
> > >>>>>>>>>>>>> it in parallel. I believe @Xuefu, @Bowen are already
> working
> > >>> on.
> > >>>>>> We
> > >>>>>>>>>>>>> should/will definitely involve them to review the final
> > >> design
> > >>> of
> > >>>>>>>>> DDL
> > >>>>>>>>>>>>> implementation. I would suggest that we should give it a
> > >> higher
> > >>>>>>>>>>> priority
> > >>>>>>>>>>>> on
> > >>>>>>>>>>>>> the DDL implementation, as it is a crucial component for
> the
> > >>> user
> > >>>>>>>>>>>>> experience of SQL_CLI.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <
> > >> suez1...@gmail.com
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>>> Thanks a lot, Shaoxuan, Jack and Lin. We should definitely
> > >>>>>>>>>>> collaborate
> > >>>>>>>>>>>>>> here, we have also our own DDL implementation running in
> > >>>>>>>>> production
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>> almost 2 years at Uber. With the joint experience from
> both
> > >>>>>>>>>>> companies,
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> can definitely make the Flink SQL DDL better.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> As @shaoxuan suggest, Jark can come up with a doc that
> talks
> > >>>>>>>>> about
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> current DDL design in Alibaba, and we can discuss and
> merge
> > >>> them
> > >>>>>>>>>> into
> > >>>>>>>>>>>>> one,
> > >>>>>>>>>>>>>> make it as a FLIP, and plan the tasks for implementation.
> > >>> Also,
> > >>>>>>>>> we
> > >>>>>>>>>>>> should
> > >>>>>>>>>>>>>> take into account the new external catalog effort in the
> > >>> design.
> > >>>>>>>>>> What
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>>> you guys think?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Shuyi
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <imj...@gmail.com
> >
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>>>> Hi Shaoxuan,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think summarizing it into a google doc is a good idea.
> We
> > >>>>>>>>> will
> > >>>>>>>>>>>>> prepare
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> in the next few days.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Shaoxuan Wang <wshaox...@gmail.com> 于2018年11月28日周三
> > >> 下午9:17写道:
> > >>>>>>>>>>>>>>>> Hi Lin and Jark,
> > >>>>>>>>>>>>>>>> Thanks for sharing those details. Can you please
> consider
> > >>>>>>>>>>>> summarizing
> > >>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>> DDL design into a google doc.
> > >>>>>>>>>>>>>>>> We can still continue the discussions on Shuyi's
> proposal.
> > >>>>>>>>> But
> > >>>>>>>>>>>>> having a
> > >>>>>>>>>>>>>>>> separate google doc will be easy for the DEV to
> > >>>>>>>>>>>>>>> understand/comment/discuss
> > >>>>>>>>>>>>>>>> on your proposed DDL implementation.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <
> imj...@gmail.com
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>> Hi Shuyi,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for bringing up this discussion and the awesome
> > >>>>>>>>> work!
> > >>>>>>>>>> I
> > >>>>>>>>>>>> have
> > >>>>>>>>>>>>>>> left
> > >>>>>>>>>>>>>>>>> some comments in the doc.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I want to share something more about the watermark
> > >>>>>>>>> definition
> > >>>>>>>>>>>>> learned
> > >>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>> Alibaba.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>       1.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>       Table should be able to accept multiple watermark
> > >>>>>>>>>>> definition.
> > >>>>>>>>>>>>>>>>>       Because a table may have more than one rowtime
> > >> field.
> > >>>>>>>>> For
> > >>>>>>>>>>>>> example,
> > >>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>       rowtime field is from existing field but missing
> in
> > >>> some
> > >>>>>>>>>>>>> records,
> > >>>>>>>>>>>>>>>>> another
> > >>>>>>>>>>>>>>>>>       is the ingestion timestamp in Kafka but not very
> > >>>>>>>>> accurate.
> > >>>>>>>>>>> In
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> case,
> > >>>>>>>>>>>>>>>>>       user may define two rowtime fields with
> watermarks
> > >> in
> > >>>>>>>>> the
> > >>>>>>>>>>>> Table
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>       one in different situation.
> > >>>>>>>>>>>>>>>>>       2.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>       Watermark stragety always work with rowtime field
> > >>>>>>>>>> together.
> > >>>>>>>>>>>>>>>>> Based on the two points metioned above, I think we
> should
> > >>>>>>>>>>> combine
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> watermark strategy and rowtime field selection (i.e.
> > >> which
> > >>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>> field
> > >>>>>>>>>>>>>>>>> used to generate watermark) in one clause, so that we
> can
> > >>>>>>>>>>> define
> > >>>>>>>>>>>>>>> multiple
> > >>>>>>>>>>>>>>>>> watermarks in one Table.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Here I will share the watermark syntax used in Alibaba
> > >>>>>>>>>> (simply
> > >>>>>>>>>>>>>>> modified):
> > >>>>>>>>>>>>>>>>> watermarkDefinition:
> > >>>>>>>>>>>>>>>>> WATERMARK [watermarkName] FOR <rowtime_field> AS
> > >>>>>>>>> wm_strategy
> > >>>>>>>>>>>>>>>>> wm_strategy:
> > >>>>>>>>>>>>>>>>>      BOUNDED WITH OFFSET 'string' timeUnit
> > >>>>>>>>>>>>>>>>> |
> > >>>>>>>>>>>>>>>>>      ASCENDING
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> The “WATERMARK” keyword starts a watermark definition.
> > >> The
> > >>>>>>>>>>> “FOR”
> > >>>>>>>>>>>>>>> keyword
> > >>>>>>>>>>>>>>>>> defines which existing field used to generate
> watermark,
> > >>>>>>>>> this
> > >>>>>>>>>>>> field
> > >>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>> already exist in the schema (we can use computed-column
> > >> to
> > >>>>>>>>>>> derive
> > >>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>> other fields). The “AS” keyword defines watermark
> > >> strategy,
> > >>>>>>>>>>> such
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>> BOUNDED
> > >>>>>>>>>>>>>>>>> WITH OFFSET (covers almost all the requirements) and
> > >>>>>>>>>> ASCENDING.
> > >>>>>>>>>>>>>>>>> When the expected rowtime field does not exist in the
> > >>>>>>>>> schema,
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>> computed-column syntax to derive it from other existing
> > >>>>>>>>>> fields
> > >>>>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>> built-in functions or user defined functions. So the
> > >>>>>>>>>>>>>> rowtime/watermark
> > >>>>>>>>>>>>>>>>> definition doesn’t need to care about “field-change”
> > >>>>>>>>> strategy
> > >>>>>>>>>>>>>>>>> (replace/add/from-field). And the proctime field
> > >> definition
> > >>>>>>>>>> can
> > >>>>>>>>>>>>> also
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> defined using computed-column. Such as pt as PROCTIME()
> > >>>>>>>>> which
> > >>>>>>>>>>>>>> defines a
> > >>>>>>>>>>>>>>>>> proctime field named “pt” in the schema.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Looking forward to working with you guys!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>> Jark Wu
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Lin Li <lincoln.8...@gmail.com> 于2018年11月28日周三
> 下午6:33写道:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> @Shuyi
> > >>>>>>>>>>>>>>>>>> Thanks for the proposal!  We have a simple DDL
> > >>>>>>>>>> implementation
> > >>>>>>>>>>>>>>> (extends
> > >>>>>>>>>>>>>>>>>> Calcite's parser) which been running for almost two
> > >> years
> > >>>>>>>>>> on
> > >>>>>>>>>>>>>>> production
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> works well.
> > >>>>>>>>>>>>>>>>>> I think the most valued things we'd learned is keeping
> > >>>>>>>>>>>> simplicity
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> standard compliance.
> > >>>>>>>>>>>>>>>>>> Here's the approximate grammar, FYI
> > >>>>>>>>>>>>>>>>>> CREATE TABLE
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> CREATE TABLE tableName(
> > >>>>>>>>>>>>>>>>>>            columnDefinition [, columnDefinition]*
> > >>>>>>>>>>>>>>>>>>            [ computedColumnDefinition [,
> > >>>>>>>>>>>> computedColumnDefinition]*
> > >>>>>>>>>>>>> ]
> > >>>>>>>>>>>>>>>>>>            [ tableConstraint [, tableConstraint]* ]
> > >>>>>>>>>>>>>>>>>>            [ tableIndex [, tableIndex]* ]
> > >>>>>>>>>>>>>>>>>>        [ PERIOD FOR SYSTEM_TIME ]
> > >>>>>>>>>>>>>>>>>>            [ WATERMARK watermarkName FOR rowTimeColumn
> > AS
> > >>>>>>>>>>>>>>>>>> withOffset(rowTimeColumn, offset) ]     ) [ WITH (
> > >>>>>>>>>>> tableOption
> > >>>>>>>>>>>> [
> > >>>>>>>>>>>>> ,
> > >>>>>>>>>>>>>>>>>> tableOption]* ) ] [ ; ]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> columnDefinition ::=
> > >>>>>>>>>>>>>>>>>>            columnName dataType [ NOT NULL ]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> dataType  ::=
> > >>>>>>>>>>>>>>>>>>            {
> > >>>>>>>>>>>>>>>>>>              [ VARCHAR ]
> > >>>>>>>>>>>>>>>>>>              | [ BOOLEAN ]
> > >>>>>>>>>>>>>>>>>>              | [ TINYINT ]
> > >>>>>>>>>>>>>>>>>>              | [ SMALLINT ]
> > >>>>>>>>>>>>>>>>>>              | [ INT ]
> > >>>>>>>>>>>>>>>>>>              | [ BIGINT ]
> > >>>>>>>>>>>>>>>>>>              | [ FLOAT ]
> > >>>>>>>>>>>>>>>>>>              | [ DECIMAL ]
> > >>>>>>>>>>>>>>>>>>              | [ DOUBLE ]
> > >>>>>>>>>>>>>>>>>>              | [ DATE ]
> > >>>>>>>>>>>>>>>>>>              | [ TIME ]
> > >>>>>>>>>>>>>>>>>>              | [ TIMESTAMP ]
> > >>>>>>>>>>>>>>>>>>              | [ VARBINARY ]
> > >>>>>>>>>>>>>>>>>>            }
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> computedColumnDefinition ::=
> > >>>>>>>>>>>>>>>>>>            columnName AS computedColumnExpression
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> tableConstraint ::=
> > >>>>>>>>>>>>>>>>>>        { PRIMARY KEY | UNIQUE }
> > >>>>>>>>>>>>>>>>>>            (columnName [, columnName]* )
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> tableIndex ::=
> > >>>>>>>>>>>>>>>>>>            [ UNIQUE ] INDEX indexName
> > >>>>>>>>>>>>>>>>>>             (columnName [, columnName]* )
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> rowTimeColumn ::=
> > >>>>>>>>>>>>>>>>>>            columnName
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> tableOption ::=
> > >>>>>>>>>>>>>>>>>>            property=value
> > >>>>>>>>>>>>>>>>>>            offset ::=
> > >>>>>>>>>>>>>>>>>>            positive integer (unit: ms)
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> CREATE VIEW
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> CREATE VIEW viewName
> > >>>>>>>>>>>>>>>>>>      [
> > >>>>>>>>>>>>>>>>>>            ( columnName [, columnName]* )
> > >>>>>>>>>>>>>>>>>>      ]
> > >>>>>>>>>>>>>>>>>>            AS queryStatement;
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> CREATE FUNCTION
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>     CREATE FUNCTION functionName
> > >>>>>>>>>>>>>>>>>>      AS 'className';
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>     className ::=
> > >>>>>>>>>>>>>>>>>>            fully qualified name
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Shuyi Chen <suez1...@gmail.com> 于2018年11月28日周三
> > >> 上午3:28写道:
> > >>>>>>>>>>>>>>>>>>> Thanks a lot, Timo and Xuefu. Yes, I think we can
> > >>>>>>>>>> finalize
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>> doc
> > >>>>>>>>>>>>>>>>>>> first and start implementation w/o the unified
> > >>>>>>>>> connector
> > >>>>>>>>>>> API
> > >>>>>>>>>>>>>> ready
> > >>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>> skipping some featue.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Xuefu, I like the idea of making Flink specific
> > >>>>>>>>>> properties
> > >>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>> generic
> > >>>>>>>>>>>>>>>>>>> key-value pairs, so that it will make integration
> with
> > >>>>>>>>>> Hive
> > >>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>> (or
> > >>>>>>>>>>>>>>>>>> others,
> > >>>>>>>>>>>>>>>>>>> e.g. Beam DDL) easier.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I'll run a final pass over the design doc and
> finalize
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> design
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> next few days. And we can start creating tasks and
> > >>>>>>>>>>>> collaborate
> > >>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> implementation. Thanks a lot for all the comments and
> > >>>>>>>>>>> inputs.
> > >>>>>>>>>>>>>>>>>>> Cheers!
> > >>>>>>>>>>>>>>>>>>> Shuyi
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
> > >>>>>>>>>>>>>>>> xuef...@alibaba-inc.com>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Yeah! I agree with Timo that DDL can actually
> proceed
> > >>>>>>>>>> w/o
> > >>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>> blocked
> > >>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>> connector API. We can leave the unknown out while
> > >>>>>>>>>>> defining
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> basic
> > >>>>>>>>>>>>>>>>>>> syntax.
> > >>>>>>>>>>>>>>>>>>>> @Shuyi
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> As commented in the doc, I think we can probably
> > >>>>>>>>> stick
> > >>>>>>>>>>> with
> > >>>>>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>>>>> syntax
> > >>>>>>>>>>>>>>>>>>>> with general properties, without extending the
> syntax
> > >>>>>>>>>> too
> > >>>>>>>>>>>>> much
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>> mimics the descriptor API.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Part of our effort on Flink-Hive integration is also
> > >>>>>>>>> to
> > >>>>>>>>>>>> make
> > >>>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>>> syntax
> > >>>>>>>>>>>>>>>>>>>> compatible with Hive's. The one in the current
> > >>>>>>>>> proposal
> > >>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>> making
> > >>>>>>>>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>>>>>> effort more challenging.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> We can help and collaborate. At this moment, I think
> > >>>>>>>>> we
> > >>>>>>>>>>> can
> > >>>>>>>>>>>>>>>> finalize
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>> the proposal and then we can divide the tasks for
> > >>>>>>>>>> better
> > >>>>>>>>>>>>>>>>> collaboration.
> > >>>>>>>>>>>>>>>>>>>> Please let me know if there are  any questions or
> > >>>>>>>>>>>>> suggestions.
> > >>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>> Xuefu
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >> ------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>>>> Sender:Timo Walther <twal...@apache.org>
> > >>>>>>>>>>>>>>>>>>>> Sent at:2018 Nov 27 (Tue) 16:21
> > >>>>>>>>>>>>>>>>>>>> Recipient:dev <dev@flink.apache.org>
> > >>>>>>>>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks for offering your help here, Xuefu. It would
> > >>>>>>>>> be
> > >>>>>>>>>>>> great
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> move
> > >>>>>>>>>>>>>>>>>>>> these efforts forward. I agree that the DDL is
> > >>>>>>>>> somehow
> > >>>>>>>>>>>>> releated
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> unified connector API design but we can also start
> > >>>>>>>>> with
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> basic
> > >>>>>>>>>>>>>>>>>>>> functionality now and evolve the DDL during this
> > >>>>>>>>>> release
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> next
> > >>>>>>>>>>>>>>>>>>> releases.
> > >>>>>>>>>>>>>>>>>>>> For example, we could identify the MVP DDL syntax
> > >>>>>>>>> that
> > >>>>>>>>>>>> skips
> > >>>>>>>>>>>>>>>> defining
> > >>>>>>>>>>>>>>>>>>>> key constraints and maybe even time attributes. This
> > >>>>>>>>>> DDL
> > >>>>>>>>>>>>> could
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> used
> > >>>>>>>>>>>>>>>>>>>> for batch usecases, ETL, and materializing SQL
> > >>>>>>>>> queries
> > >>>>>>>>>>> (no
> > >>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>> operations like windows).
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The unified connector API is high on our priority
> > >>>>>>>>> list
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> 1.8
> > >>>>>>>>>>>>>>>>>>>> release. I will try to update the document until mid
> > >>>>>>>>> of
> > >>>>>>>>>>>> next
> > >>>>>>>>>>>>>>> week.
> > >>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Timo
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > >>>>>>>>>>>>>>>>>>>>> Thanks a lot, Xuefu. I was busy for some other
> > >>>>>>>>> stuff
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> last 2
> > >>>>>>>>>>>>>>>>>>>> weeks,
> > >>>>>>>>>>>>>>>>>>>>> but we are definitely interested in moving this
> > >>>>>>>>>>> forward.
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>> once
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> unified connector API design [1] is done, we can
> > >>>>>>>>>>> finalize
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>> well and start creating concrete subtasks to
> > >>>>>>>>>>> collaborate
> > >>>>>>>>>>>> on
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> implementation with the community.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Shuyi
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
> > >>>>>>>>>>>>>>>>>> xuef...@alibaba-inc.com>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Shuyi,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> I'm wondering if you folks still have the
> > >>>>>>>>> bandwidth
> > >>>>>>>>>>>>> working
> > >>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>> this.
> > >>>>>>>>>>>>>>>>>>>>>> We have some dedicated resource and like to move
> > >>>>>>>>>> this
> > >>>>>>>>>>>>>> forward.
> > >>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>> collaborate.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Xuefu
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>> ------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>>>>>> 发件人:wenlong.lwl<wenlong88....@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>> 日 期:2018年11月05日 11:15:35
> > >>>>>>>>>>>>>>>>>>>>>> 收件人:<dev@flink.apache.org>
> > >>>>>>>>>>>>>>>>>>>>>> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi, Shuyi, thanks for the proposal.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> I have two concerns about the table ddl:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 1. how about remove the source/sink mark from the
> > >>>>>>>>>> ddl,
> > >>>>>>>>>>>>>> because
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>> necessary, the framework determine the table
> > >>>>>>>>>> referred
> > >>>>>>>>>>>> is a
> > >>>>>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>> or a
> > >>>>>>>>>>>>>>>>>>>> sink
> > >>>>>>>>>>>>>>>>>>>>>> according to the context of the query using the
> > >>>>>>>>>> table.
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>> convenient for use defining a table which can be
> > >>>>>>>>>> both
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> sink,
> > >>>>>>>>>>>>>>>>>>>>>> and more convenient for catalog to persistent and
> > >>>>>>>>>>> manage
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> meta
> > >>>>>>>>>>>>>>>>>>> infos.
> > >>>>>>>>>>>>>>>>>>>>>> 2. how about just keeping one pure string map as
> > >>>>>>>>>>>>> parameters
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> table,
> > >>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>> create tabe Kafka10SourceTable (
> > >>>>>>>>>>>>>>>>>>>>>> intField INTEGER,
> > >>>>>>>>>>>>>>>>>>>>>> stringField VARCHAR(128),
> > >>>>>>>>>>>>>>>>>>>>>> longField BIGINT,
> > >>>>>>>>>>>>>>>>>>>>>> rowTimeField TIMESTAMP
> > >>>>>>>>>>>>>>>>>>>>>> ) with (
> > >>>>>>>>>>>>>>>>>>>>>> connector.type = ’kafka’,
> > >>>>>>>>>>>>>>>>>>>>>> connector.property-version = ’1’,
> > >>>>>>>>>>>>>>>>>>>>>> connector.version = ’0.10’,
> > >>>>>>>>>>>>>>>>>>>>>> connector.properties.topic = ‘test-kafka-topic’,
> > >>>>>>>>>>>>>>>>>>>>>> connector.properties.startup-mode =
> > >>>>>>>>> ‘latest-offset’,
> > >>>>>>>>>>>>>>>>>>>>>> connector.properties.specific-offset = ‘offset’,
> > >>>>>>>>>>>>>>>>>>>>>> format.type = 'json'
> > >>>>>>>>>>>>>>>>>>>>>> format.prperties.version=’1’,
> > >>>>>>>>>>>>>>>>>>>>>> format.derive-schema = 'true'
> > >>>>>>>>>>>>>>>>>>>>>> );
> > >>>>>>>>>>>>>>>>>>>>>> Because:
> > >>>>>>>>>>>>>>>>>>>>>> 1. in TableFactory, what user use is a string map
> > >>>>>>>>>>>>>> properties,
> > >>>>>>>>>>>>>>>>>> defining
> > >>>>>>>>>>>>>>>>>>>>>> parameters by string-map can be the closest way to
> > >>>>>>>>>>>> mapping
> > >>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> parameters.
> > >>>>>>>>>>>>>>>>>>>>>> 2. The table descriptor can be extended by user,
> > >>>>>>>>>> like
> > >>>>>>>>>>>> what
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>>> and Json, it means that the parameter keys in
> > >>>>>>>>>>> connector
> > >>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>> format
> > >>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>> different in different implementation, we can not
> > >>>>>>>>>>>> restrict
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> specified set, so we need a map in connector scope
> > >>>>>>>>>>> and a
> > >>>>>>>>>>>>> map
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> connector.properties scope. why not just give
> > >>>>>>>>> user a
> > >>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>> map,
> > >>>>>>>>>>>>>>>>> let
> > >>>>>>>>>>>>>>>>>>>> them
> > >>>>>>>>>>>>>>>>>>>>>> put parameters in a format they like, which is
> > >>>>>>>>> also
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>> simplest
> > >>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> implement DDL parser.
> > >>>>>>>>>>>>>>>>>>>>>> 3. whether we can define a format clause or not,
> > >>>>>>>>>>> depends
> > >>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> implementation of the connector, using different
> > >>>>>>>>>>> clause
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> misunderstanding that we can combine the
> > >>>>>>>>> connectors
> > >>>>>>>>>>> with
> > >>>>>>>>>>>>>>>> arbitrary
> > >>>>>>>>>>>>>>>>>>>> formats,
> > >>>>>>>>>>>>>>>>>>>>>> which may not work actually.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <
> > >>>>>>>>>>>>>>> wos...@gmail.com
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>> +1, Thanks for the proposal.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I guess this is a long-awaited change. This can
> > >>>>>>>>>>> vastly
> > >>>>>>>>>>>>>>> increase
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> functionalities of the SQL Client as it will be
> > >>>>>>>>>>>> possible
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>> complex
> > >>>>>>>>>>>>>>>>>>>>>>> extensions like for example those provided by
> > >>>>>>>>>> Apache
> > >>>>>>>>>>>>>>> Bahir[1].
> > >>>>>>>>>>>>>>>>>>>>>>> Best Regards,
> > >>>>>>>>>>>>>>>>>>>>>>> Dom.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/bahir-flink
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> sob., 3 lis 2018 o 17:17 Rong Rong <
> > >>>>>>>>>>>> walter...@gmail.com>
> > >>>>>>>>>>>>>>>>>> napisał(a):
> > >>>>>>>>>>>>>>>>>>>>>>>> +1. Thanks for putting the proposal together
> > >>>>>>>>>> Shuyi.
> > >>>>>>>>>>>>>>>>>>>>>>>> DDL has been brought up in a couple of times
> > >>>>>>>>>>>> previously
> > >>>>>>>>>>>>>>> [1,2].
> > >>>>>>>>>>>>>>>>>>>>>> Utilizing
> > >>>>>>>>>>>>>>>>>>>>>>>> DDL will definitely be a great extension to the
> > >>>>>>>>>>>> current
> > >>>>>>>>>>>>>>> Flink
> > >>>>>>>>>>>>>>>>> SQL
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> systematically support some of the previously
> > >>>>>>>>>>> brought
> > >>>>>>>>>>>> up
> > >>>>>>>>>>>>>>>>> features
> > >>>>>>>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>> [3]. And it will also be beneficial to see the
> > >>>>>>>>>>>> document
> > >>>>>>>>>>>>>>>> closely
> > >>>>>>>>>>>>>>>>>>>> aligned
> > >>>>>>>>>>>>>>>>>>>>>>>> with the previous discussion for unified SQL
> > >>>>>>>>>>> connector
> > >>>>>>>>>>>>> API
> > >>>>>>>>>>>>>>>> [4].
> > >>>>>>>>>>>>>>>>>>>>>>>> I also left a few comments on the doc. Looking
> > >>>>>>>>>>> forward
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> alignment
> > >>>>>>>>>>>>>>>>>>>>>>>> with the other couple of efforts and
> > >>>>>>>>> contributing
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> them!
> > >>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>> Rong
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > >>>>>>>>>>>>>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> > >>>>>>>>>>>>>>>>>>>>>>>> [3]
> > >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-8003
> > >>>>>>>>>>>>>>>>>>>>>>>> [4]
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
> > >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
> > >>>>>>>>>>>>>>> bowenl...@gmail.com
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks Shuyi!
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I left some comments there. I think the design
> > >>>>>>>>> of
> > >>>>>>>>>>> SQL
> > >>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> Flink-Hive
> > >>>>>>>>>>>>>>>>>>>>>>>>> integration/External catalog enhancements will
> > >>>>>>>>>> work
> > >>>>>>>>>>>>>> closely
> > >>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>> each
> > >>>>>>>>>>>>>>>>>>>>>>>>> other. Hope we are well aligned on the
> > >>>>>>>>> directions
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> two
> > >>>>>>>>>>>>>>>>>>> designs,
> > >>>>>>>>>>>>>>>>>>>>>>>> and I
> > >>>>>>>>>>>>>>>>>>>>>>>>> look forward to working with you guys on both!
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Bowen
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <
> > >>>>>>>>>>>>>>>> suez1...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> SQL DDL support has been a long-time ask from
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> community.
> > >>>>>>>>>>>>>>>>>>>>>> Current
> > >>>>>>>>>>>>>>>>>>>>>>>>> Flink
> > >>>>>>>>>>>>>>>>>>>>>>>>>> SQL support only DML (e.g. SELECT and INSERT
> > >>>>>>>>>>>>>> statements).
> > >>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>>>>>>>> form, Flink SQL users still need to
> > >>>>>>>>>> define/create
> > >>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>> sources
> > >>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>> sinks
> > >>>>>>>>>>>>>>>>>>>>>>>>>> programmatically in Java/Scala. Also, in SQL
> > >>>>>>>>>>> Client,
> > >>>>>>>>>>>>>>> without
> > >>>>>>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>>>>>>>>>> support,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the current implementation does not allow
> > >>>>>>>>>>> dynamical
> > >>>>>>>>>>>>>>> creation
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> table,
> > >>>>>>>>>>>>>>>>>>>>>>>>> type
> > >>>>>>>>>>>>>>>>>>>>>>>>>> or functions with SQL, this adds friction for
> > >>>>>>>>>> its
> > >>>>>>>>>>>>>>> adoption.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I drafted a design doc [1] with a few other
> > >>>>>>>>>>>> community
> > >>>>>>>>>>>>>>>> members
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>> proposes
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the design and implementation for adding DDL
> > >>>>>>>>>>> support
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> Flink.
> > >>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>> initial
> > >>>>>>>>>>>>>>>>>>>>>>>>>> design considers DDL for table, view, type,
> > >>>>>>>>>>> library
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> function.
> > >>>>>>>>>>>>>>>>>>>>>> It
> > >>>>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> be great to get feedback on the design from
> > >>>>>>>>> the
> > >>>>>>>>>>>>>> community,
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> align
> > >>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>> latest effort in unified SQL connector API [2]
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> Flink
> > >>>>>>>>>>>>>>>> Hive
> > >>>>>>>>>>>>>>>>>>>>>>>>> integration
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [3].
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly appreciated.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [3]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>
> >
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will
> > >>>>>>>>> somehow
> > >>>>>>>>>>>>> connect
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>>>>> future."
> > >>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow
> > >>>>>>>>> connect
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>> future."
> > >>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>> "So you have to trust that the dots will somehow connect
> in
> > >>> your
> > >>>>>>>>>>>> future."
> > >>>>>>>>> --
> > >>>>>>>>> "So you have to trust that the dots will somehow connect in
> your
> > >>>>>>> future."
> > >>>>>>>
> > >>>>
> >
> >
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>

Reply via email to