Hi Timo, I think I get your point why it would be better to put Table Update Mode in MVP. But because this is a sophisticated problem, we need to think about it carefully and need some discussions offline. We will reach out to here when we have a clear design.
8). Support row/map/array data type Do you mean how to distinguish int[] and Integer[]? Yes, maybe we need to support NULL/NOT NULL just for array elements, such as: ARRAY<INT NOT NULL> is int[], ARRAY<INT> is Integer[]. Cheers, Jark On Fri, 14 Dec 2018 at 19:46, Timo Walther <[email protected]> wrote: > Hi all, > > I think we should discuss what we consider an MVP DDL. For me, an MVP > DDL was to just focus on a CREATE TABLE statement. It would be great to > come up with a solution that finally solves the issue of connecting > different kind of systems. One reason why we postponed DDL statements > for quite some time is that we cannot change it easily once released. > > However, the current state of the discussion can be summarized by the > following functionality: > > 1. Only support append source tables (because the distinction of > update/retract table is not clear). > 2. Only support append and update sink tables (because a changeflag is > missing). > 3. Don't support outputting to Kafka with time attributes (because we > cannot set a timestamp). > > Personally, I would like to have more use cases enabled by solving the > header timestamps and change flag discussion. And I don't see a reason > why we have to rush here. > > 8). Support row/map/array data type > How do we want to support object arrays vs. primitive arrays? Currently, > we need to make this clear distinction for between external system and > Java [1] (E.g. byte[] arrays vs. object arrays) and users can choose > between Types.PRIMITIVE_ARRAY and Types.OBJECT_ARRAY. Otherwise we need > to support NULL/NOT NULL for array elements. > > 4) Event-Time Attributes and Watermarks > I completely agree with Rong here. `ts AS SYSTEMROWTIME()` indicates > that the system takes care of this column and for unification this would > mean both for sources and sinks. It is still a computed column but gives > hints to connectors. Implementing connectors can choose if they want to > use this hint or not. The Flink Kafka connector would make use of it. > @Jark: I think a PERSISTED keyword would confuse users (as shown by your > Stackoverflow question) and would only make sense for SYSTEMROWTIME and > no other computed column. > > 3) SOURCE / SINK / BOTH > @Jark: My initial suggestion was to make the SOURCE/SINK optional such > that users can only use CREATE TABLE depending on the use case. But as I > said before, since I cannot find support here, we can drop the keywords. > > 7) Table Update Mode > @Jark: The questions that you posted are exactly the ones that we should > find an answer for. Because a DDL should just be the front end to the > characteristics of an engine. After thinking about it again a change > flag is actually more similar to a PARTITION BY clause because it > defines a field that is not in the table's schema but in the schema of > the physical format. However, the columns defined by a PARTITION BY are > shown when describing/projecting a table whereas a change flag column > must not be shown. > > If a table source supports append, upserts, and retractions, we need a > way to express how we want to connect to the system. > > hasPrimaryKey() && !hasChangeFlag() -> append mode > hasPrimaryKey() && hasChangeFlag() -> upsert mode > !hasPrimaryKey() && hasChangeFlag() -> retract mode > > Are we fine with this? > > Regarding reading `topic`, `partition`, `offset` or custom properties > from message headers. I already discussed this in my unified connector > document. We don't need built-in functions for all these properties. > Those things depend on the connector and format, it is their > responsibility to extend the table schema in order to expose those > properties (e.g. by providing a Map<String, String> for all these kind > of properties). > > Example: > > CREATE TABLE myTopic ( > col1 INT, > col2 VARCHAR, > col3 MAP<VARCHAR, VARCHAR>, > col4 AS SYSTEMROWTIME() > ) > PARTITION BY (col0 LONG) > WITH ( > connector.type = kafka > format.type = key-value-metadata > format.key-format.type = avro > format.value-format.type = json > ) > > The format defines to use a KeyedDeserializationSchema that extends the > schema by a metadata column. The PARTITION BY declares the columns for > Kafka's key in Avro format. col1 till col2 are Kafka's JSON columns. > > Thanks for your feedback, > Timo > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings > > > Am 13.12.18 um 09:50 schrieb Jark Wu: > > Hi all, > > > > Here are a bunch of my thoughts: > > > > 8). support row/map/array data type > > That's fine with me if we want to support them in the MVP. In my mind, we > > can have the field type syntax like this: > > > > ``` > > filedType ::= > > { > > simpleType > > | MAP<simpleType, fieldType> > > | ARRAY<fieldType> > > | ROW<columnDefinition [, columnDefinition]*> > > } > > ``` > > > > I have included this in @Shuyi's summary doc [1] . Please leave feedbacks > > there! > > > > [1] > > > https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit > > > > 3) SOURCE / SINK / BOTH > > @Timo, CREATE TABLE statement is registering a virtual table in the > session > > or catalog. I don't think it is immutable, as we might also want to > support > > CREATE INDEX statements in the future. On the other hand, ACL is not a > part > > of the table definition, it should belong to the permission system which > is > > usually stored in somewhere else. So GRANT/INVOKE sounds like a more > > standard option. > > > > 7) Table Update Mode > > I agree with @Shuyi that table update mode can be left out from the MVP. > > Because IMO, the update mode will not break the current MVP design. It > > should be something to add, like the CHANGE_FLAG you proposed. We can > > continue this discussion when we finalize the MVP. > > > > Meanwhile, the update mode is a big topic which may involve several weeks > > to discuss. For example, (a) do we support CHANGE_FLAG when the table > > supports upsert (or when the table defined a primary key)? (b) the > > CHANGE_FLAG should support write and read both. (c) currently, we only > > support true (add) and false (retract) flag type, are they enough? (d) > How > > to connect an external storage which also support insert/delete flag like > > mysql binlog? > > > > Regarding to the CHANGE_FLAG @Timo proposed, I think this is a good > > direction. But should isRetraction be a physical field and make > CHANGE_FLAG > > like a constraint on that? If yes, then what the type of isRetraction? > > > > 4.b) Ingesting and writing timestamps to systems. > > @Shuyi, PERSISTED can solve the problem of the field is not physically > > stored. However, it doesn't solve the problem that how to write a field > > back to the computed column, because "A computed column cannot be the > > target of an INSERT or UPDATE statement" even if the computed column is > > persisted. If we want to write a rowtime back the the external system, > the > > DML should look like this: "INSERT INTO sink SELECT a, rowtime FROM > > source". The point is that the `rowtime` must be specified in the INSERT > > statement, that's why I hope the `rowtime` field in Table is not a > computed > > column. See more information about PERSISTED [2] [3]. > > > > Another point to consider is SYSTEMROWTIME() only solve reading timestamp > > from message header in systems. There are many similar requirements here, > > such as reading `topic`, `partition`, `offset` or custom properties from > > message headers, do we plan to support a bunch of built-in functions like > > SYSTEMROWTIME()? Do we have some clean and easy way for this? > > > > [2]: > > > https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-2017 > > [3]: > > > https://stackoverflow.com/questions/51390531/sql-server-persisted-computed-columns-versus-actual-normal-column > > > > Looking forward to collaborate with you guys! > > > > Best, > > Jark > > > > > > On Thu, 13 Dec 2018 at 01:38, Rong Rong <[email protected]> wrote: > > > >> Thanks for the summary effort @shuyi. Sorry for jumping in the > discussion > >> so late. > >> > >> As of the scope of MVP, I think we might want to consider adding "table > >> update mode" problem to it. I agree with @timo that might not be easily > >> changed in the future if the flags has to be part of the schema/column > >> definition. > >> > >> Regarding the components under discussion. > >> 4) Event-Time Attributes and Watermarks > >> b, c) I actually like the special indicator way @fabian suggested to > hint > >> Flink to read time attributes directly from the system not the data > `(ts AS > >> SYSTEMROWTIME())`. It should also address the "compute field not > emitted" > >> problem by carrying the "virtual column" concept like @shuyi suggested. > >> However if I understand correctly, this also required to be defined as > part > >> of the schema/column definition. > >> > >> 3) SOURCE / SINK / BOTH > >> +1 on not adding properties to `CREATE TABLE` to manage ACL/permission. > >> > >> On a higher level, I think one question I have is whether we can > >> definitively come to an agreement that the features under discussion > (and > >> potential solutions) can be cleanly adjusted/added from what we are > >> providing on MVP (e.g. the schema/column definition might be hard to > >> achieve but if we all agree ACL/permission should not be part of the > >> `CREATE TABLE` and a decision can be made later). @shuyi I can also > help in > >> drafting the FLIP doc by summarizing the features under discussion and > the > >> concerns to whether included in the MVP, so that we can carry on the > >> discussions alongside with the MVP implementation effort. I think each > one > >> of these features deserves a subsection dedicated for it. > >> > >> Many thanks, > >> Rong > >> > >> > >> On Wed, Dec 12, 2018 at 1:14 AM Shuyi Chen <[email protected]> wrote: > >> > >>> Hi all, > >>> > >>> I summarize the MVP based on the features that we agreed upon. For > table > >>> update mode and custom watermark strategy and ts extractor, I found > there > >>> are some discussions, so I decided to leave them out for the MVP. > >>> For row/map/array data type, I think we can add it as well if everyone > >>> agrees. > >>> > >>> > >>> 4) Event-Time Attributes and Watermarks > >>> Cited from SQL Server 2017 document ( > >>> > >>> > >> > https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017 > >>> ), > >>> "A > >>> computed column is a virtual column that is not physically stored in > the > >>> table, unless the column is marked PERSISTED. A computed column > >> expression > >>> can use data from other columns to calculate a value for the column to > >>> which it belongs. " I think we can also use introduce the PERSISTED > >> keyword > >>> for computed column to indicate that the field can be stored back to > the > >>> table, i.e. ts AS SYSTEMROWTIME() PERSISTED. > >>> > >>> 3) SOURCE / SINK / BOTH > >>> GRANT/INVOKE sounds like a more standard option than adding a property > to > >>> CREATE TABLE to manage the ACL/permission. The ACL can be stored > >> somewhere > >>> in a database, and allow/disallow access to a dynamic table depending > on > >>> whether it's a "INSERT INTO" or "SELECT". > >>> > >>> I can volunteer to put the discussion as a FLIP. I can try to > summarize > >>> the current discussion, and share edit permission with you to > collaborate > >>> on the documents. After we finalized the doc, we can publish it as a > >> FLIP. > >>> What do you think? > >>> > >>> Shuyi > >>> > >>> > >>> > >>> On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <[email protected]> > wrote: > >>> > >>>> Hi all, > >>>> > >>>> thanks for summarizing the discussion @Shuyi. I think we need to > >> include > >>>> the "table update mode" problem as it might not be changed easily in > >> the > >>>> future. Regarding "support row/map/array data type", I don't see a > >>>> problem why we should not support them now as the data types are > >> already > >>>> included in the runtime. The "support custom timestamp extractor" is > >>>> solved by the computed columns approach. The "custom watermark > >> strategy" > >>>> can be added by supplying a class name as paramter in my opinion. > >>>> > >>>> Regarding the comments of Lin and Jark: > >>>> > >>>> @Lin: Instantiating a TableSource/Sink should not cost much, but we > >>>> should not mix catalog discussion and DDL at this point. > >>>> > >>>> 4) Event-Time Attributes and Watermarks > >>>> 4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will > >>>> violate the rule": there is no explicit rule of doing so. Computed > >>>> column are also not standard compliant, if we can use information that > >>>> is encoded in constraints we should use it. Adding more and more > >>>> top-level properties makes the interaction with connectors more > >>>> difficult. An additional HEADER keyword sounds too connector-specific > >>>> and also not SQL compliant to me. > >>>> > >>>> 3) SOURCE / SINK / BOTH > >>>> GRANT/INVOKE are mutating an existing table, right? In my opinion, > >>>> independent of SQL databases but focusing on Flink user requirements, > a > >>>> CREATE TABLE statement should be an immutable definition of a > >> connection > >>>> to an external system. > >>>> > >>>> 7) Table Update Mode > >>>> As far as I can see, the only thing missing for enabling all table > >> modes > >>>> is the declaration of a change flag. We could introduce a new keyword > >>>> here similar to WATERMARK: > >>>> > >>>> CREATE TABLE output_kafka_t1( > >>>> id bigint, > >>>> msg varchar, > >>>> CHANGE_FLAG FOR isRetraction > >>>> ) WITH ( > >>>> type=kafka > >>>> ,... > >>>> ); > >>>> > >>>> CREATE TABLE output_kafka_t1( > >>>> CHANGE_FLAG FOR isUpsert > >>>> id bigint, > >>>> msg varchar, > >>>> PRIMARY_KEY(id) > >>>> ) WITH ( > >>>> type=kafka > >>>> ,... > >>>> ); > >>>> > >>>> What do you think? > >>>> > >>>> @Jark: We should definitely stage the discussions and mention the > >>>> opinions and advantages/disadvantages that have been proposed already > >> in > >>>> the FLIP. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> Am 10.12.18 um 08:10 schrieb Jark Wu: > >>>>> Hi all, > >>>>> > >>>>> It's great to see we have an agreement on MVP. > >>>>> > >>>>> 4.b) Ingesting and writing timestamps to systems. > >>>>> I would treat the field as a physical column not a virtual column. If > >>> we > >>>>> treat it as computed column, it will be confused that the behavior is > >>>>> different when it is a source or sink. > >>>>> When it is a physical column, the behavior could be unified. Then the > >>>>> problem is how to mapping from the field to kafka message timestamp? > >>>>> One is Lin proposed above and is also used in KSQL[1]. Another idea > >> is > >>>>> introducing a HEADER column which strictly map by name to the fields > >> in > >>>>> message header. > >>>>> For example, > >>>>> > >>>>> CREATE TABLE output_kafka_t1( > >>>>> id bigint, > >>>>> ts timestamp HEADER, > >>>>> msg varchar > >>>>> ) WITH ( > >>>>> type=kafka > >>>>> ,... > >>>>> ); > >>>>> > >>>>> This is used in Alibaba but not included in the DDL draft. It will > >>>> further > >>>>> extend the SQL syntax, which is we should be cautious about. What do > >>> you > >>>>> think about this two solutions? > >>>>> > >>>>> 4.d) Custom watermark strategies: > >>>>> @Timo, I don't have a strong opinion on this. > >>>>> > >>>>> 3) SOURCE/SINK/BOTH > >>>>> Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and > >>>>> standard way to manage the permission, which is also adopted by > >> HIVE[2] > >>>> and > >>>>> many databases. > >>>>> > >>>>> [1]: > >>> https://docs.confluent.io/current/ksql/docs/tutorials/examples.html > >>>>> [2]: > >>>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges > >>>>> @Timo, it's great if someone can conclude the discussion and > >> summarize > >>>> into > >>>>> a FLIP. > >>>>> @Shuyi, Thanks a lot for putting it all together. The google doc > >> looks > >>>> good > >>>>> to me, and I left some minor comments there. > >>>>> > >>>>> Regarding to the FLIP, I have some suggestions: > >>>>> 1. The FLIP can contain MILESTONE1 and FUTURE WORKS. > >>>>> 2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax. > >>>>> 3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and > >> ADOPTED. > >>> We > >>>>> can derive MILESTONE2 from this easily when it is ready. > >>>>> > >>>>> I summarized the Future Works based on Shuyi's work: > >>>>> > >>>>> Adopted: (Should detailed described here...) > >>>>> 1. support data type nullability and precision. > >>>>> 2. comment on table and columns. > >>>>> > >>>>> Under Discussion: (Should briefly describe some options...) > >>>>> 1. Ingesting and writing timestamps to systems. > >>>>> 2. support custom watermark strategy. > >>>>> 3. support table update mode > >>>>> 4. support row/map/array data type > >>>>> 5. support schema derivation > >>>>> 6. support system versioned temporal table > >>>>> 7. support table index > >>>>> > >>>>> We can continue the further discussion here, also can separate to an > >>>> other > >>>>> DISCUSS topic if it is a sophisticated problem such as Table Update > >>> Mode, > >>>>> Temporal Table. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> On Mon, 10 Dec 2018 at 11:54, Lin Li <[email protected]> wrote: > >>>>> > >>>>>> hi all, > >>>>>> Thanks for your valuable input! > >>>>>> > >>>>>> 4) Event-Time Attributes and Watermarks: > >>>>>> 4.b) @Fabian As you mentioned using a computed columns `ts AS > >>>>>> SYSTEMROWTIME()` > >>>>>> for writing out to kafka table sink will violate the rule that > >>> computed > >>>>>> fields are not emitted. > >>>>>> Since the timestamp column in kafka's header area is a specific > >>>>>> materialization protocol, > >>>>>> why don't we treat it as an connector property? For an example: > >>>>>> ``` > >>>>>> CREATE TABLE output_kafka_t1( > >>>>>> id bigint, > >>>>>> ts timestamp, > >>>>>> msg varchar > >>>>>> ) WITH ( > >>>>>> type=kafka, > >>>>>> header.timestamp=ts > >>>>>> ,... > >>>>>> ); > >>>>>> ``` > >>>>>> > >>>>>> 4d) For custom watermark strategies > >>>>>> @Fabian Agree with you that opening another topic about this feature > >>>> later. > >>>>>> 3) SOURCE / SINK / BOTH > >>>>>> I think the permissions and availabilities are two separately > >> things, > >>>>>> permissions > >>>>>> can be managed well by using GRANT/INVOKE(you can call it DCL) > >>> solutions > >>>>>> which > >>>>>> commonly used in different DBs. The permission part can be an new > >>> topic > >>>> for > >>>>>> later discussion, what do you think? > >>>>>> > >>>>>> For the availabilities, @Fabian @Timo I've another question, > >>>>>> does instantiate a TableSource/Sink cost much or has some other > >>>> downsides? > >>>>>> IMO, create a new source/sink object via the construct seems not > >>> costly. > >>>>>> When receiving a DDL we should associate it with the catalog object > >>>>>> (reusing an existence or create a new one). > >>>>>> Am I lost something important? > >>>>>> > >>>>>> 5. Schema declaration: > >>>>>> @Timo yes, your concern about the user convenience is very > >> important. > >>>> But > >>>>>> I haven't seen a clear way to solve this so far. > >>>>>> Do we put it later and wait for more inputs from the community? > >>>>>> > >>>>>> Shuyi Chen <[email protected]> 于2018年12月8日周六 下午4:27写道: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> Thanks a lot for the great discussion. I think we can continue the > >>>>>>> discussion here while carving out a MVP so that the community can > >>> start > >>>>>>> working on. Based on the discussion so far, I try to summarize what > >>> we > >>>>>> will > >>>>>>> do for the MVP: > >>>>>>> > >>>>>>> MVP > >>>>>>> > >>>>>>> 1. support CREATE TABLE > >>>>>>> 2. support exisiting data type in Flink SQL, ignore > nullability > >>> and > >>>>>>> precision > >>>>>>> 3. support table comments and column comments > >>>>>>> 4. support table constraint PRIMARY KEY and UNIQUE > >>>>>>> 5. support table properties using key-value pairs > >>>>>>> 6. support partitioned by > >>>>>>> 7. support computed column > >>>>>>> 8. support from-field and from-source timestamp extractors > >>>>>>> 9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE > >>>> watermark > >>>>>>> strategies. > >>>>>>> 10. support a table property to allow explicit enforcement of > >>>>>>> read/write(source/sink) permission of a table > >>>>>>> > >>>>>>> I try to put up the DDL grammar ( > >>>>>>> > >>>>>>> > >> > https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing > >>>>>>> ) > >>>>>>> based on the MVP features above and the previous design docs. > >> Please > >>>>>> take a > >>>>>>> look and comment on it. > >>>>>>> > >>>>>>> > >>>>>>> Also, I summarize the future Improvement on CREATE TABLE as the > >>>>>> followings: > >>>>>>> 1. support table update mode > >>>>>>> 2. support data type nullability and precision > >>>>>>> 3. support row/map/array data type > >>>>>>> 4. support custom timestamp extractor and watermark strategy > >>>>>>> 5. support schema derivation > >>>>>>> 6. support system versioned temporal table > >>>>>>> 7. support table index > >>>>>>> > >>>>>>> I suggest we first agree on the MVP feature list and the MVP > >> grammar. > >>>> And > >>>>>>> then we can either continue the discussion of the future > >> improvements > >>>>>> here, > >>>>>>> or create separate JIRAs for each item and discuss further in the > >>> JIRA. > >>>>>>> What do you guys think? > >>>>>>> > >>>>>>> Shuyi > >>>>>>> > >>>>>>> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <[email protected]> > >>>> wrote: > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I think we are making good progress. Thanks for all the feedback > >> so > >>>>>> far. > >>>>>>>> 3. Sources/Sinks: > >>>>>>>> It seems that I can not find supporters for explicit SOURCE/SINK > >>>>>>>> declaration so I'm fine with not using those keywords. > >>>>>>>> @Fabian: Maybe we don't haven have to change the TableFactory > >>>> interface > >>>>>>>> but just provide some helper functions in the TableFactoryService. > >>>> This > >>>>>>>> would solve the availability problem, but the permission problem > >>> would > >>>>>>>> still not be solved. If you are fine with it, we could introduce a > >>>>>>>> property instead? > >>>>>>>> > >>>>>>>> 5. Schema declaration: > >>>>>>>> @Lin: We should find an agreement on this as it requires changes > >> to > >>>> the > >>>>>>>> TableFactory interface. We should minimize changes to this > >> interface > >>>>>>>> because it is user-facing. Especially, if format schema and table > >>>>>> schema > >>>>>>>> differ, the need for such a functionality is very important. Our > >>> goal > >>>>>> is > >>>>>>>> to connect to existing infrastructure. For example, if we are > >> using > >>>>>> Avro > >>>>>>>> and the existing Avro format has enums but Flink SQL does not > >>> support > >>>>>>>> enums, it would be helpful to let the Avro format derive a table > >>>>>> schema. > >>>>>>>> Otherwise your need to declare both schemas which leads to CREATE > >>>> TABLE > >>>>>>>> statements of 400 lines+. > >>>>>>>> I think the mentioned query: > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > >>>>>>>> format.schema-file = "/my/avrofile.avsc") > >>>>>>>> is fine and should only be valid if the schema contains no > >>>> non-computed > >>>>>>>> columns. > >>>>>>>> > >>>>>>>> 7. Table Update Mode: > >>>>>>>> After thinking about it again, I agree. The mode of the sinks can > >> be > >>>>>>>> derived from the query and the existence of a PRIMARY KEY > >>> declaration. > >>>>>>>> But Fabian raised a very good point. How do we deal with sources? > >>>> Shall > >>>>>>>> we introduce a new keywords similar to WATERMARKS such that a > >>>>>>>> upsert/retract flag is not part of the visible schema? > >>>>>>>> > >>>>>>>> 4a. How to mark a field as attribute? > >>>>>>>> @Jark: Thanks for the explanation of the WATERMARK clause > >> semantics. > >>>>>>>> This is a nice way of marking existing fields. This sounds good to > >>> me. > >>>>>>>> 4c) WATERMARK as constraint > >>>>>>>> I'm fine with leaving the WATERMARK clause in the schema > >> definition. > >>>>>>>> 4d) Custom watermark strategies: > >>>>>>>> I would already think about custom watermark strategies as the > >>> current > >>>>>>>> descriptor design already supports this. ScalarFunction's don't > >> work > >>>> as > >>>>>>>> a PeriodicWatermarkAssigner has different semantics. Why not > >> simply > >>>>>>>> entering the a full class name here as it is done in the current > >>>>>> design? > >>>>>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka) > >>>>>>>> @Fabian: Yes, your suggestion sounds good to me. This behavior > >> would > >>>> be > >>>>>>>> similar to our current `timestamps: from-source` design. > >>>>>>>> > >>>>>>>> Once our discussion has found a conclusion, I would like to > >>> volunteer > >>>>>>>> and summarize the outcome of this mailing thread. It nicely aligns > >>>> with > >>>>>>>> the update work on the connector improvements document (that I > >>> wanted > >>>>>> to > >>>>>>>> do anyway) and the ongoing external catalog discussion. > >>> Furthermore, I > >>>>>>>> would also want to propose how to change existing interfaces by > >>>> keeping > >>>>>>>> the DDL, connector improvements, and external catalog support in > >>> mind. > >>>>>>>> Would that be ok for you? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Am 07.12.18 um 14:48 schrieb Fabian Hueske: > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> Thanks for the discussion. > >>>>>>>>> I'd like to share my point of view as well. > >>>>>>>>> > >>>>>>>>> 4) Event-Time Attributes and Watermarks: > >>>>>>>>> 4.a) I agree with Lin and Jark's proposal. Declaring a watermark > >> on > >>>>>> an > >>>>>>>>> attribute declares it as an event-time attribute. > >>>>>>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka). We > >>>>>> could > >>>>>>>> use > >>>>>>>>> a special function like (ts AS SYSTEMROWTIME()). This function > >> will > >>>>>>>>> indicate that we read the timestamp directly from the system (and > >>> not > >>>>>>> the > >>>>>>>>> data). We can also write the field back to the system when > >> emitting > >>>>>> the > >>>>>>>>> table (violating the rule that computed fields are not emitted). > >>>>>>>>> 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE > >> KEY > >>>>>>>>> constraint and therefore keep it in the schema definition. > >>>>>>>>> 4d) For custom watermark strategies, a simple expressions or > >>>>>>>>> ScalarFunctions won't be sufficient. Sophisticated approaches > >> could > >>>>>>>> collect > >>>>>>>>> histograms, etc. But I think we can leave that out for later. > >>>>>>>>> > >>>>>>>>> 3) SOURCE / SINK / BOTH > >>>>>>>>> As you said, there are two things to consider here: permission > >> and > >>>>>>>>> availability of a TableSource/TableSink. > >>>>>>>>> I think that neither should be a reason to add a keyword at such > >> a > >>>>>>>>> sensitive position. > >>>>>>>>> However, I also see Timo's point that it would be good to know > >>>>>> up-front > >>>>>>>> how > >>>>>>>>> a table can be used without trying to instantiate a > >>> TableSource/Sink > >>>>>>> for > >>>>>>>> a > >>>>>>>>> query. > >>>>>>>>> Maybe we can extend the TableFactory such that it provides > >>>>>> information > >>>>>>>>> about which sources/sinks it can provide. > >>>>>>>>> > >>>>>>>>> 7. Table Update Mode > >>>>>>>>> Something that we definitely need to consider is how tables are > >>>>>>> ingested, > >>>>>>>>> i.e., append, retract or upsert. > >>>>>>>>> Especially, since upsert and retraction need a meta-data column > >>> that > >>>>>>>>> indicates whether an event is an insert (or upsert) or a delete > >>>>>> change. > >>>>>>>>> This column needs to be identified somehow, most likely as part > >> of > >>>>>> the > >>>>>>>>> input format. Ideally, this column should not be part of the > >> table > >>>>>>> schema > >>>>>>>>> (as it would be always true). > >>>>>>>>> Emitting tables is not so much of an issue as the properties of > >> the > >>>>>>> table > >>>>>>>>> tell use what to do (append-only/update, unique key y/n). > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Fabian > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu < > >>> [email protected] > >>>>>>> : > >>>>>>>>>> Hi Timo, > >>>>>>>>>> > >>>>>>>>>> Thanks for your quickly feedback! Here are some of my thoughts: > >>>>>>>>>> > >>>>>>>>>> Append, upserts, retract mode on sinks is also a very complex > >>>>>>> problem. I > >>>>>>>>>> think append/upserts/retract is the ability of a table, user do > >>> not > >>>>>>>> need to > >>>>>>>>>> specify a table is used for append or retraction or upsert. The > >>>>>> query > >>>>>>>> can > >>>>>>>>>> choose which mode the sink is. If an unbounded groupby is > >> inserted > >>>>>>> into > >>>>>>>> an > >>>>>>>>>> append sink (the sink only implements/supports append), an > >>> exception > >>>>>>>> can be > >>>>>>>>>> thrown. A more complex problem is, if we want to write > >>>>>>>> retractions/upserts > >>>>>>>>>> to Kafka, how to encode the change flag (add or retract/delete) > >> on > >>>>>> the > >>>>>>>>>> table? Maybe we should propose some protocal for the change flag > >>>>>>>> encoding, > >>>>>>>>>> but I don't have a clear idea about this right now. > >>>>>>>>>> > >>>>>>>>>> 3. Sources/Sinks: The source/sink tag is similar to the > >>>>>>>>>> append/upsert/retract problem. Besides source/sink, actully we > >>> have > >>>>>>>> stream > >>>>>>>>>> source, stream sink, batch source, batch sink, and the stream > >> sink > >>>>>>> also > >>>>>>>>>> include append/upsert/retract three modes. Should we put all the > >>>>>> tags > >>>>>>> on > >>>>>>>>>> the CREATE TABLE? IMO, the table's ability is defined by the > >> table > >>>>>>>> itself, > >>>>>>>>>> user do not need to specify it. If it is only a readable table, > >> an > >>>>>>>>>> exception can be thrown when write to it. As the source/sink tag > >>> can > >>>>>>> be > >>>>>>>>>> omitted in CREATE TABLE, could we skip it and only support > >> CREATE > >>>>>>> TABLE > >>>>>>>> in > >>>>>>>>>> the first version, and add it back in the future when we really > >>> need > >>>>>>>> it? It > >>>>>>>>>> keeps API compatible and make sure the MVP is what we consider > >>>>>>> clearly. > >>>>>>>>>> 4a. How to mark a field as attribute? > >>>>>>>>>> The watermark definition includes two parts: use which field as > >>> time > >>>>>>>>>> attribute and use what generate strategy. > >>>>>>>>>> When we want to mark `ts` field as attribute: WATERMARK FOR `ts` > >>> AS > >>>>>>>> OFFSET > >>>>>>>>>> '5' SECOND. > >>>>>>>>>> If we have a POJO{id, user, ts} field named "pojo", we can mark > >> it > >>>>>>> like > >>>>>>>>>> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND > >>>>>>>>>> > >>>>>>>>>> 4b. timestamp write to Kafka message header > >>>>>>>>>> Even though we can define multiple time attribute on a table, > >> only > >>>>>> one > >>>>>>>> time > >>>>>>>>>> attribute can be actived/used in a query (in a stream). When we > >>>>>> enable > >>>>>>>>>> `writeTiemstamp`, the only attribute actived in the stream will > >> be > >>>>>>>> write to > >>>>>>>>>> Kafka message header. What I mean the timestmap in StreamRecord > >> is > >>>>>> the > >>>>>>>> time > >>>>>>>>>> attribute in the stream. > >>>>>>>>>> > >>>>>>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the > >> INDEX, > >>>>>>>> PRIMARY > >>>>>>>>>> KEY keywords. > >>>>>>>>>> > >>>>>>>>>> @Timo, Do you have any other advice or questions on the > >> watermark > >>>>>>>> syntax ? > >>>>>>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS > >>>>>>>> "OFFSET" > >>>>>>>>>> VS ... > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Jark > >>>>>>>>>> > >>>>>>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li <[email protected]> > >>> wrote: > >>>>>>>>>>> Hi Timo, > >>>>>>>>>>> Thanks for your feedback, here's some thoughts of mine: > >>>>>>>>>>> > >>>>>>>>>>> 3. Sources/Sinks: > >>>>>>>>>>> "Let's assume an interactive CLI session, people should be able > >>> to > >>>>>>> list > >>>>>>>>>> all > >>>>>>>>>>> source table and sink tables to know upfront if they can use an > >>>>>>> INSERT > >>>>>>>>>> INTO > >>>>>>>>>>> here or not." > >>>>>>>>>>> This requirement can be simply resolved by a document that list > >>> all > >>>>>>>>>>> supported source/sink/both connectors and the sql-client can > >>>>>> perform > >>>>>>> a > >>>>>>>>>>> quick check. It's only an implementation choice, not necessary > >>> for > >>>>>>> the > >>>>>>>>>>> syntax. > >>>>>>>>>>> For connector implementation, a connector may implement one or > >>> some > >>>>>>> or > >>>>>>>>>> all > >>>>>>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can > >>>>>> derive > >>>>>>>> the > >>>>>>>>>>> availability for any give query without the SOURCE/SINk > >> keywords > >>> or > >>>>>>>>>>> specific table properties in WITH clause. > >>>>>>>>>>> Since there's still indeterminacy, shall we skip these two > >>> keywords > >>>>>>> for > >>>>>>>>>> the > >>>>>>>>>>> MVP DDL? We can make further discussion after users' feedback. > >>>>>>>>>>> > >>>>>>>>>>> 6. Partitioning and keys > >>>>>>>>>>> Agree with you that raise the priority of table constraint and > >>>>>>>>>> partitioned > >>>>>>>>>>> table support for better connectivity to Hive and Kafka. I'll > >> add > >>>>>>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft > >>> doc > >>>>>>>>>>> later[1]. > >>>>>>>>>>> > >>>>>>>>>>> 5. Schema declaration > >>>>>>>>>>> "if users want to declare computed columns they have a "schema" > >>>>>>>>>> constraints > >>>>>>>>>>> but without columns > >>>>>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > >>>>>>>>>>> format.schema-file = "/my/avrofile.avsc") " > >>>>>>>>>>> > >>>>>>>>>>> From the point of my view, this ddl is invalid because the > >>>> primary > >>>>>>> key > >>>>>>>>>>> constraint already references two columns but types unseen. > >>>>>>>>>>> And Xuefu pointed a important matching problem, so let's put > >>> schema > >>>>>>>>>>> derivation as a follow-up extension ? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Timo Walther <[email protected]> 于2018年12月6日周四 下午6:05写道: > >>>>>>>>>>> > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> great to have such a lively discussion. My next batch of > >>> feedback: > >>>>>>>>>>>> @Jark: We don't need to align the descriptor approach with > >> SQL. > >>>>>> I'm > >>>>>>>>>> open > >>>>>>>>>>>> for different approaches as long as we can serve a broad set > >> of > >>>>>> use > >>>>>>>>>>>> cases and systems. The descriptor approach was a first attempt > >>> to > >>>>>>>> cover > >>>>>>>>>>>> all aspects and connector/format characteristics. Just another > >>>>>>>> example, > >>>>>>>>>>>> that is missing in the DDL design: How can a user decide if > >>>>>> append, > >>>>>>>>>>>> retraction, or upserts should be used to sink data into the > >>> target > >>>>>>>>>>>> system? Do we want to define all these improtant properties in > >>> the > >>>>>>> big > >>>>>>>>>>>> WITH property map? If yes, we are already close to the > >>> descriptor > >>>>>>>>>>>> approach. Regarding the "standard way", most DDL languages > >> have > >>>>>> very > >>>>>>>>>>>> custom syntax so there is not a real "standard". > >>>>>>>>>>>> > >>>>>>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access > >> it > >>>>>> can > >>>>>>>> be > >>>>>>>>>>>> created using a regular CREATE TABLE (omitting a specific > >>>>>>> source/sink) > >>>>>>>>>>>> declaration. Regarding the transition from source/sink to > >> both, > >>>>>> yes > >>>>>>> we > >>>>>>>>>>>> would need to update the a DDL and catalogs. But is this a > >>>>>> problem? > >>>>>>>> One > >>>>>>>>>>>> also needs to add new queries that use the tables. @Xuefu: It > >> is > >>>>>> not > >>>>>>>>>>>> only about security aspects. Especially for streaming use > >> cases, > >>>>>> not > >>>>>>>>>>>> every connector can be used as a source easily. For example, a > >>>>>> JDBC > >>>>>>>>>> sink > >>>>>>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI > >>>>>>> session, > >>>>>>>>>>>> people should be able to list all source table and sink tables > >>> to > >>>>>>> know > >>>>>>>>>>>> upfront if they can use an INSERT INTO here or not. > >>>>>>>>>>>> > >>>>>>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this > >> in > >>>>>> the > >>>>>>>>>>>> design given that Hive integration and Kafka key support are > >> in > >>>>>> the > >>>>>>>>>>>> making/are on our roadmap for this release. > >>>>>>>>>>>> > >>>>>>>>>>>> 5. Schema declaration: @Lin: You are right it is not > >>> conflicting. > >>>>>> I > >>>>>>>>>> just > >>>>>>>>>>>> wanted to raise the point because if users want to declare > >>>>>> computed > >>>>>>>>>>>> columns they have a "schema" constraints but without columns. > >>> Are > >>>>>> we > >>>>>>>> ok > >>>>>>>>>>>> with a syntax like ... > >>>>>>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > >>>>>>>>>>>> format.schema-file = "/my/avrofile.avsc") ? > >>>>>>>>>>>> @Xuefu: Yes, you are right that an external schema might not > >>>>>> excatly > >>>>>>>>>>>> match but this is true for both directions: > >>>>>>>>>>>> table schema "derives" format schema and format schema > >> "derives" > >>>>>>> table > >>>>>>>>>>>> schema. > >>>>>>>>>>>> > >>>>>>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular > >> but > >>> we > >>>>>>>>>>>> should not just adopt everything from Hive as there syntax is > >>> very > >>>>>>>>>>>> batch-specific. We should come up with a superset of > >> historical > >>>>>> and > >>>>>>>>>>>> future requirements. Supporting Hive queries can be an > >>>>>> intermediate > >>>>>>>>>>>> layer on top of Flink's DDL. > >>>>>>>>>>>> > >>>>>>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the > >>>>>>>> TimestampExtractor > >>>>>>>>>>>> interface as this is also important for better separation of > >>>>>>> connector > >>>>>>>>>>>> and table module [1]. However, I'm wondering about watermark > >>>>>>>>>> generation. > >>>>>>>>>>>> 4a. timestamps are in the schema twice: > >>>>>>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it > >> as > >>>>>>>>>>>> rowtime": yes, but we need to mark a field as such an > >> attribute. > >>>>>> How > >>>>>>>>>>>> does the syntax for marking look like? Also in case of > >>> timestamps > >>>>>>> that > >>>>>>>>>>>> are nested in the schema? > >>>>>>>>>>>> > >>>>>>>>>>>> 4b. how can we write out a timestamp into the message header?: > >>>>>>>>>>>> I agree to simply ignore computed columns when writing out. > >> This > >>>>>> is > >>>>>>>>>> like > >>>>>>>>>>>> 'field-change: add' that I mentioned in the improvements > >>> document. > >>>>>>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to > >>> Kafka > >>>>>>>>>>>> message header": Unfortunately, there is no timestamp in the > >>>>>> stream > >>>>>>>>>>>> record. Additionally, multiple time attributes can be in a > >>> schema. > >>>>>>> So > >>>>>>>>>> we > >>>>>>>>>>>> need a constraint that tells the sink which column to use > >>>>>> (possibly > >>>>>>>>>>>> computed as well)? > >>>>>>>>>>>> > >>>>>>>>>>>> 4c. separate all time attribute concerns into a special clause > >>>>>> next > >>>>>>> to > >>>>>>>>>>>> the regular schema? > >>>>>>>>>>>> @Jark: I don't have a strong opinion on this. I just have the > >>>>>>> feeling > >>>>>>>>>>>> that the "schema part" becomes quite messy because the actual > >>>>>> schema > >>>>>>>>>>>> with types and fields is accompanied by so much metadata about > >>>>>>>>>>>> timestamps, watermarks, keys,... and we would need to > >> introduce > >>> a > >>>>>>> new > >>>>>>>>>>>> WATERMARK keyword within a schema that was close to standard > >> up > >>> to > >>>>>>>> this > >>>>>>>>>>>> point. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks everyone, > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-9461 > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Am 06.12.18 um 07:08 schrieb Jark Wu: > >>>>>>>>>>>>> Hi Timo, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thank you for the valuable feedbacks. > >>>>>>>>>>>>> > >>>>>>>>>>>>> First of all, I think we don't need to align the SQL > >>>>>> functionality > >>>>>>> to > >>>>>>>>>>>>> Descriptor. Because SQL is a more standard API, we should be > >> as > >>>>>>>>>>> cautious > >>>>>>>>>>>> as > >>>>>>>>>>>>> possible to extend the SQL syntax. If something can be done > >> in > >>> a > >>>>>>>>>>> standard > >>>>>>>>>>>>> way, we shouldn't introduce something new. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Here are some of my thoughts: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. Scope: Agree. > >>>>>>>>>>>>> 2. Constraints: Agree. > >>>>>>>>>>>>> 4. Time attributes: > >>>>>>>>>>>>> 4a. timestamps are in the schema twice. > >>>>>>>>>>>>> If an existing field is Long/Timestamp, we can just > use > >>> it > >>>>>> as > >>>>>>>>>>>> rowtime, > >>>>>>>>>>>>> no twice defined. If it is not a Long/Timestamp, we use > >>> computed > >>>>>>>>>> column > >>>>>>>>>>>> to > >>>>>>>>>>>>> get an expected timestamp column to be rowtime, is this what > >>> you > >>>>>>> mean > >>>>>>>>>>>>> defined twice? But I don't think it is a problem, but an > >>>>>>> advantages, > >>>>>>>>>>>>> because it is easy to use, user do not need to consider > >> whether > >>>>>> to > >>>>>>>>>>>> "replace > >>>>>>>>>>>>> the existing column" or "add a new column", he will not be > >>>>>> confused > >>>>>>>>>>>> what's > >>>>>>>>>>>>> the real schema is, what's the index of rowtime in the > >> schema? > >>>>>>>>>>> Regarding > >>>>>>>>>>>> to > >>>>>>>>>>>>> the optimization, even if timestamps are in schema twice, > >> when > >>>>>> the > >>>>>>>>>>>> original > >>>>>>>>>>>>> timestamp is never used in query, then the projection > >> pushdown > >>>>>>>>>>>> optimization > >>>>>>>>>>>>> can cut this field as early as possible, which is exactly the > >>>>>> same > >>>>>>> as > >>>>>>>>>>>>> "replacing the existing column" in runtime. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4b. how can we write out a timestamp into the message > >>>>>> header? > >>>>>>>>>>>>> That's a good point. I think computed column is just > a > >>>>>>> virtual > >>>>>>>>>>>> column > >>>>>>>>>>>>> on table which is only relative to reading. If we want to > >> write > >>>>>> to > >>>>>>> a > >>>>>>>>>>>> table > >>>>>>>>>>>>> with computed column defined, we only need to provide the > >>> columns > >>>>>>>>>>> except > >>>>>>>>>>>>> computed columns (see SQL Server [1]). The computed column is > >>>>>>> ignored > >>>>>>>>>>> in > >>>>>>>>>>>>> the insert statement. Get back to the question, how can we > >>> write > >>>>>>> out > >>>>>>>>>> a > >>>>>>>>>>>>> timestamp into the message header? IMO, we can provide a > >>>>>>>>>> configuration > >>>>>>>>>>> to > >>>>>>>>>>>>> support this, such as `kafka.writeTimestamp=true`, then the > >>>>>>> timestmap > >>>>>>>>>>> in > >>>>>>>>>>>>> StreamRecord will be write to Kafka message header. What do > >> you > >>>>>>>>>> think? > >>>>>>>>>>>>> 4c. separate all time attribute concerns into a > >> special > >>>>>>> clause > >>>>>>>>>>> next > >>>>>>>>>>>> to > >>>>>>>>>>>>> the regular schema? > >>>>>>>>>>>>> Separating watermark into a special clause similar to > >>>>>>>>>> PARTITIONED > >>>>>>>>>>>> BY is > >>>>>>>>>>>>> also a good idea. Conceptually, it's fine to put watermark in > >>>>>>> schema > >>>>>>>>>>> part > >>>>>>>>>>>>> or out schema part. But if we want to support multiple > >>> watermark > >>>>>>>>>>>>> definition, maybe it would be better to put it in schema > >> part. > >>> It > >>>>>>> is > >>>>>>>>>>>>> similar to Index Definition that we can define several > >> indexes > >>>>>> on a > >>>>>>>>>>> table > >>>>>>>>>>>>> in schema part. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4d. How can people come up with a custom watermark > >>>>>> strategy? > >>>>>>>>>>>>> In most cases, the built-in strategy can works good. > >> If > >>> we > >>>>>>> need > >>>>>>>>>> a > >>>>>>>>>>>>> custom one, we can use a scalar function which restrict to > >> only > >>>>>>>>>> return > >>>>>>>>>>> a > >>>>>>>>>>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime > >> AS > >>>>>>>>>>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined > >>>>>> scalar > >>>>>>>>>>>> function > >>>>>>>>>>>>> accepts 3 parameters and return a nullable Long which can be > >>> used > >>>>>>> as > >>>>>>>>>>>>> punctuated watermark assigner. Another choice is > >> implementing a > >>>>>>> class > >>>>>>>>>>>>> extending the > >>>>>>>>>>>>> > >> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` > >>>>>> and > >>>>>>>>>> use > >>>>>>>>>>>> it > >>>>>>>>>>>>> in SQL: WATERMARK for rowtime AS > >> 'com.my.MyWatermarkStrategy'. > >>>>>> But > >>>>>>> if > >>>>>>>>>>>>> scalar function can cover the requirements here, I would > >> prefer > >>>>>> it > >>>>>>>>>>> here, > >>>>>>>>>>>>> because it keeps standard compliant. BTW, this feature is not > >>> in > >>>>>>> MVP, > >>>>>>>>>>> we > >>>>>>>>>>>>> can discuss it more depth in the future when we need it. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5. Schema declaration: > >>>>>>>>>>>>> I like the proposal to omit the schema if we can get the > >> schema > >>>>>>> from > >>>>>>>>>>>>> external storage or something schema file. Actually, we have > >>>>>>> already > >>>>>>>>>>>>> encountered this requirement in out company. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> +1 to @Xuefu that we should be as close as possible to Hive > >>>>>> syntax > >>>>>>>>>>> while > >>>>>>>>>>>>> keeping SQL ANSI standard. This will make it more acceptable > >>> and > >>>>>>>>>> reduce > >>>>>>>>>>>> the > >>>>>>>>>>>>> learning cost for user. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1]: > >>>>>>>>>>>>> > >> > https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017 > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Jark > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu < > >>>>>> [email protected] > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> Hi Timo/Shuyi/Lin, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the discussions. It seems that we are converging > >> to > >>>>>>>>>>> something > >>>>>>>>>>>>>> meaningful. Here are some of my thoughts: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. +1 on MVP DDL > >>>>>>>>>>>>>> 3. Markers for source or sink seem more about permissions on > >>>>>>> tables > >>>>>>>>>>> that > >>>>>>>>>>>>>> belong to a security component. Unless the table is created > >>>>>>>>>>> differently > >>>>>>>>>>>>>> based on source, sink, or both, it doesn't seem necessary to > >>> use > >>>>>>>>>> these > >>>>>>>>>>>>>> keywords to enforce permissions. > >>>>>>>>>>>>>> 5. It might be okay if schema declaration is always needed. > >>>>>> While > >>>>>>>>>>> there > >>>>>>>>>>>>>> might be some duplication sometimes, it's not always true. > >> For > >>>>>>>>>>> example, > >>>>>>>>>>>>>> external schema may not be exactly matching Flink schema. > >> For > >>>>>>>>>>> instance, > >>>>>>>>>>>>>> data types. Even if so, perfect match is not required. For > >>>>>>> instance, > >>>>>>>>>>> the > >>>>>>>>>>>>>> external schema file may evolve while table schema in Flink > >>> may > >>>>>>> stay > >>>>>>>>>>>>>> unchanged. A responsible reader should be able to scan the > >>> file > >>>>>>>>>> based > >>>>>>>>>>> on > >>>>>>>>>>>>>> file schema and return the data based on table schema. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Other aspects: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to > >>>>>>> operate > >>>>>>>>>> on > >>>>>>>>>>>>>> Hive metadata and data, it's an add-on benefit if we can be > >>>>>>>>>> compatible > >>>>>>>>>>>> with > >>>>>>>>>>>>>> Hive syntax/semantics while following ANSI standard. At > >> least > >>> we > >>>>>>>>>>> should > >>>>>>>>>>>> be > >>>>>>>>>>>>>> as close as possible. Hive DDL can found at > >>>>>>>>>>>>>> > >> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Xuefu > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>> ------------------------------------------------------------------ > >>>>>>>>>>>>>> Sender:Lin Li <[email protected]> > >>>>>>>>>>>>>> Sent at:2018 Dec 6 (Thu) 10:49 > >>>>>>>>>>>>>> Recipient:dev <[email protected]> > >>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Timo and Shuyi, > >>>>>>>>>>>>>> thanks for your feedback. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Scope > >>>>>>>>>>>>>> agree with you we should focus on the MVP DDL first. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. Constraints > >>>>>>>>>>>>>> yes, this can be a follow-up issue. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. Sources/Sinks > >>>>>>>>>>>>>> If a TABLE has both read/write access requirements, should > >> we > >>>>>>>>>> declare > >>>>>>>>>>> it > >>>>>>>>>>>>>> using > >>>>>>>>>>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further > >>>>>>>>>> question, > >>>>>>>>>>>> if a > >>>>>>>>>>>>>> TABLE > >>>>>>>>>>>>>> t1 firstly declared as read only (as a source table), then > >> for > >>>>>>> some > >>>>>>>>>>> new > >>>>>>>>>>>>>> requirements > >>>>>>>>>>>>>> t1 will change to a sink table, in this case we need > >> updating > >>>>>>> both > >>>>>>>>>>> the > >>>>>>>>>>>> DDL > >>>>>>>>>>>>>> and catalogs. > >>>>>>>>>>>>>> Further more, let's think about the BATCH query, update one > >>>>>> table > >>>>>>>>>>>> in-place > >>>>>>>>>>>>>> can be a common case. > >>>>>>>>>>>>>> e.g., > >>>>>>>>>>>>>> ``` > >>>>>>>>>>>>>> CREATE TABLE t1 ( > >>>>>>>>>>>>>> col1 varchar, > >>>>>>>>>>>>>> col2 int, > >>>>>>>>>>>>>> col3 varchar > >>>>>>>>>>>>>> ... > >>>>>>>>>>>>>> ); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> INSERT [OVERWRITE] TABLE t1 > >>>>>>>>>>>>>> AS > >>>>>>>>>>>>>> SELECT > >>>>>>>>>>>>>> (some computing ...) > >>>>>>>>>>>>>> FROM t1; > >>>>>>>>>>>>>> ``` > >>>>>>>>>>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the > >>>>>>>>>> validation > >>>>>>>>>>>>>> purpose, we can find out other ways. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. Time attributes > >>>>>>>>>>>>>> As Shuyi mentioned before, there exists an > >>>>>>>>>>>>>> > >>> `org.apache.flink.table.sources.tsextractors.TimestampExtractor` > >>>>>>> for > >>>>>>>>>>>> custom > >>>>>>>>>>>>>> defined time attributes usage, but this expression based > >> class > >>>>>> is > >>>>>>>>>> more > >>>>>>>>>>>>>> friendly for table api not the SQL. > >>>>>>>>>>>>>> ``` > >>>>>>>>>>>>>> /** > >>>>>>>>>>>>>> * Provides the an expression to extract the timestamp > >>> for a > >>>>>>>>>> rowtime > >>>>>>>>>>>>>> attribute. > >>>>>>>>>>>>>> */ > >>>>>>>>>>>>>> abstract class TimestampExtractor extends > >> FieldComputer[Long] > >>>>>> with > >>>>>>>>>>>>>> Serializable { > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> /** Timestamp extractors compute the timestamp as > Long. > >>> */ > >>>>>>>>>>>>>> override def getReturnType: TypeInformation[Long] = > >>>>>>>>>>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]] > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> ``` > >>>>>>>>>>>>>> BTW, I think both the Scalar function and the > >>> TimestampExtractor > >>>>>>> are > >>>>>>>>>>>>>> expressing computing logic, the TimestampExtractor has no > >> more > >>>>>>>>>>>> advantage in > >>>>>>>>>>>>>> SQL scenarios. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 6. Partitioning and keys > >>>>>>>>>>>>>> Primary Key is included in Constraint part, and partitioned > >>>>>> table > >>>>>>>>>>>> support > >>>>>>>>>>>>>> can be another topic later. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 5. Schema declaration > >>>>>>>>>>>>>> Agree with you that we can do better schema derivation for > >>> user > >>>>>>>>>>>>>> convenience, but this is not conflict with the syntax. > >>>>>>>>>>>>>> Table properties can carry any useful informations both for > >>> the > >>>>>>>>>> users > >>>>>>>>>>>> and > >>>>>>>>>>>>>> the framework, I like your `contract name` proposal, > >>>>>>>>>>>>>> e.g., `WITH (format.type = avro)`, the framework can > >> recognize > >>>>>>> some > >>>>>>>>>>>>>> `contract name` like `format.type`, `connector.type` and > >> etc. > >>>>>>>>>>>>>> And also derive the table schema from an existing schema > >> file > >>>>>> can > >>>>>>> be > >>>>>>>>>>>> handy > >>>>>>>>>>>>>> especially one with too many table columns. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Lin > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Timo Walther <[email protected]> 于2018年12月5日周三 下午10:40写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Jark and Shuyi, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> thanks for pushing the DDL efforts forward. I agree that we > >>>>>>> should > >>>>>>>>>>> aim > >>>>>>>>>>>>>>> to combine both Shuyi's design and your design. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Here are a couple of concerns that I think we should > >> address > >>> in > >>>>>>> the > >>>>>>>>>>>>>> design: > >>>>>>>>>>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE > >>>>>> statements > >>>>>>>>>>> first. > >>>>>>>>>>>>>>> I think this topic has already enough potential for long > >>>>>>>>>> discussions > >>>>>>>>>>>> and > >>>>>>>>>>>>>>> is very helpful for users. We can discuss CREATE VIEW and > >>>>>> CREATE > >>>>>>>>>>>>>>> FUNCTION afterwards as they are not related to each other. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2. Constraints: I think we should consider things like > >>>>>>> nullability, > >>>>>>>>>>>>>>> VARCHAR length, and decimal scale and precision in the > >> future > >>>>>> as > >>>>>>>>>> they > >>>>>>>>>>>>>>> allow for nice optimizations. However, since both the > >>>>>> translation > >>>>>>>>>> and > >>>>>>>>>>>>>>> runtime operators do not support those features. I would > >> not > >>>>>>>>>>> introduce > >>>>>>>>>>>> a > >>>>>>>>>>>>>>> arbitrary default value but omit those parameters for now. > >>> This > >>>>>>> can > >>>>>>>>>>> be > >>>>>>>>>>>> a > >>>>>>>>>>>>>>> follow-up issue once the basic DDL has been merged. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs > >>>>>>> CREATE > >>>>>>>>>>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow > >>> for > >>>>>>>>>> these > >>>>>>>>>>>>>>> explicit declaration because in most production scenarios, > >>>>>> teams > >>>>>>>>>> have > >>>>>>>>>>>>>>> strict read/write access requirements. For example, a data > >>>>>>> science > >>>>>>>>>>> team > >>>>>>>>>>>>>>> should only consume from a event Kafka topic but should not > >>>>>>>>>>> accidently > >>>>>>>>>>>>>>> write back to the single source of truth. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 4. Time attributes: In general, I like your computed > >> columns > >>>>>>>>>> approach > >>>>>>>>>>>>>>> because it makes defining a rowtime attributes transparent > >>> and > >>>>>>>>>>> simple. > >>>>>>>>>>>>>>> However, there are downsides that we should discuss. > >>>>>>>>>>>>>>> 4a. Jarks current design means that timestamps are in the > >>>>>> schema > >>>>>>>>>>> twice. > >>>>>>>>>>>>>>> The design that is mentioned in [1] makes this more > >> flexible > >>> as > >>>>>>> it > >>>>>>>>>>>>>>> either allows to replace an existing column or add a > >> computed > >>>>>>>>>> column. > >>>>>>>>>>>>>>> 4b. We need to consider the zoo of storage systems that is > >>> out > >>>>>>>>>> there > >>>>>>>>>>>>>>> right now. Take Kafka as an example, how can we write out a > >>>>>>>>>> timestamp > >>>>>>>>>>>>>>> into the message header? We need to think of a reverse > >>>>>> operation > >>>>>>>>>> to a > >>>>>>>>>>>>>>> computed column. > >>>>>>>>>>>>>>> 4c. Does defining a watermark really fit into the schema > >> part > >>>>>> of > >>>>>>> a > >>>>>>>>>>>>>>> table? Shouldn't we separate all time attribute concerns > >>> into a > >>>>>>>>>>> special > >>>>>>>>>>>>>>> clause next to the regular schema, similar how PARTITIONED > >> BY > >>>>>>> does > >>>>>>>>>> it > >>>>>>>>>>>> in > >>>>>>>>>>>>>>> Hive? > >>>>>>>>>>>>>>> 4d. How can people come up with a custom watermark > >> strategy? > >>> I > >>>>>>>>>> guess > >>>>>>>>>>>>>>> this can not be implemented in a scalar function and would > >>>>>>> require > >>>>>>>>>>> some > >>>>>>>>>>>>>>> new type of UDF? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 6. Partitioning and keys: Another question that the DDL > >>> design > >>>>>>>>>> should > >>>>>>>>>>>>>>> answer is how do we express primary keys (for upserts), > >>>>>>>>>> partitioning > >>>>>>>>>>>>>>> keys (for Hive, Kafka message keys). All part of the table > >>>>>>> schema? > >>>>>>>>>>>>>>> 5. Schema declaration: I find it very annoying that we want > >>> to > >>>>>>>>>> force > >>>>>>>>>>>>>>> people to declare all columns and types again even though > >>> this > >>>>>> is > >>>>>>>>>>>>>>> usually already defined in some company-wide format. I know > >>>>>> that > >>>>>>>>>>>> catalog > >>>>>>>>>>>>>>> support will greatly improve this. But if no catalog is > >> used, > >>>>>>>>>> people > >>>>>>>>>>>>>>> need to manually define a schema with 50+ fields in a Flink > >>>>>> DDL. > >>>>>>>>>>> What I > >>>>>>>>>>>>>>> actually promoted having two ways of reading data: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1. Either the format derives its schema from the table > >>> schema. > >>>>>>>>>>>>>>> CREATE TABLE (col INT) WITH (format.type = avro) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2. Or the table schema can be omitted and the format schema > >>>>>>> defines > >>>>>>>>>>> the > >>>>>>>>>>>>>>> table schema (+ time attributes). > >>>>>>>>>>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file = > >>>>>>>>>>>>>>> "/my/avrofile.avsc") > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please let me know what you think about each item. I will > >> try > >>>>>> to > >>>>>>>>>>>>>>> incorporate your feedback in [1] this week. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf > >>>>>>>>>>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu: > >>>>>>>>>>>>>>>> Hi Shuyi, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It's exciting to see we can make such a great progress > >> here. > >>>>>>>>>>>>>>>> Regarding to the watermark: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Watermarks can be defined on any columns (including > >>>>>>>>>> computed-column) > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>> table schema. > >>>>>>>>>>>>>>>> The computed column can be computed from existing columns > >>>>>> using > >>>>>>>>>>>> builtin > >>>>>>>>>>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction). > >>>>>>>>>>>>>>>> So IMO, it can work out for almost all the scenarios not > >>> only > >>>>>>>>>> common > >>>>>>>>>>>>>>>> scenarios. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I don't think using a `TimestampExtractor` to support > >> custom > >>>>>>>>>>> timestamp > >>>>>>>>>>>>>>>> extractor in SQL is a good idea. Because > >>> `TimestampExtractor` > >>>>>>>>>>>>>>>> is not a SQL standard function. If we support > >>>>>>> `TimestampExtractor` > >>>>>>>>>>> in > >>>>>>>>>>>>>>> SQL, > >>>>>>>>>>>>>>>> do we need to support CREATE FUNCTION for > >>>>>> `TimestampExtractor`? > >>>>>>>>>>>>>>>> I think `ScalarFunction` can do the same thing with > >>>>>>>>>>>>>> `TimestampExtractor` > >>>>>>>>>>>>>>>> but more powerful and standard. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The core idea of the watermark definition syntax is that > >> the > >>>>>>>>>> schema > >>>>>>>>>>>>>> part > >>>>>>>>>>>>>>>> defines all the columns of the table, it is exactly what > >> the > >>>>>>> query > >>>>>>>>>>>>>> sees. > >>>>>>>>>>>>>>>> The watermark part is something like a primary key > >>> definition > >>>>>> or > >>>>>>>>>>>>>>> constraint > >>>>>>>>>>>>>>>> on SQL Table, it has no side effect on the schema, only > >>>>>> defines > >>>>>>>>>> what > >>>>>>>>>>>>>>>> watermark strategy is and makes which field as the rowtime > >>>>>>>>>> attribute > >>>>>>>>>>>>>>> field. > >>>>>>>>>>>>>>>> If the rowtime field is not in the existing fields, we can > >>> use > >>>>>>>>>>>> computed > >>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>> to generate it from other existing fields. The Descriptor > >>>>>>> Pattern > >>>>>>>>>>> API > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>> is very useful when writing a Table API job, but is not > >>>>>>>>>>> contradictory > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> Watermark DDL from my perspective. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> [1]: > >>>>>>>>>>>>>>>> > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > >>>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen < > >> [email protected] > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> Hi Jark and Shaoxuan, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks a lot for the summary. I think we are making great > >>>>>>>>>> progress > >>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>> Below are my thoughts. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> *(1) watermark definition > >>>>>>>>>>>>>>>>> IMO, it's better to keep it consistent with the rowtime > >>>>>>>>>> extractors > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> watermark strategies defined in > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > >>>>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>>>> Using built-in functions seems to be too much for most of > >>> the > >>>>>>>>>>> common > >>>>>>>>>>>>>>>>> scenarios. > >>>>>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > >>>>>>>>>>>>>>>>> Actually, I think we can put the source/sink type info > >> into > >>>>>> the > >>>>>>>>>>> table > >>>>>>>>>>>>>>>>> properties, so we can use CREATE TABLE. > >>>>>>>>>>>>>>>>> (3) View DDL with properties > >>>>>>>>>>>>>>>>> We can remove the view properties section now for the MVP > >>> and > >>>>>>> add > >>>>>>>>>>> it > >>>>>>>>>>>>>>> back > >>>>>>>>>>>>>>>>> later if needed. > >>>>>>>>>>>>>>>>> (4) Type Definition > >>>>>>>>>>>>>>>>> I agree we can put the type length or precision into > >> future > >>>>>>>>>>> versions. > >>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>>> for the grammar difference, currently, I am using the > >>> grammar > >>>>>>> in > >>>>>>>>>>>>>> Calcite > >>>>>>>>>>>>>>>>> type DDL, but since we'll extend the parser in Flink, so > >> we > >>>>>> can > >>>>>>>>>>>>>>> definitely > >>>>>>>>>>>>>>>>> change if needed. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Shuyi > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu < > >> [email protected]> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> Hi Shaoxuan, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag > >> on > >>>>>>> create > >>>>>>>>>>>>>> table > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> the another major difference. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Summarize the main differences again: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *(1) watermark definition > >>>>>>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > >>>>>>>>>>>>>>>>>> (3) View DDL with properties > >>>>>>>>>>>>>>>>>> (4) Type Definition > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang < > >>>>>>> [email protected] > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> Hi Jark, > >>>>>>>>>>>>>>>>>>> Thanks for the summary. Your plan for the 1st round > >>>>>>>>>>> implementation > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>> looks good to me. > >>>>>>>>>>>>>>>>>>> Have we reached the agreement on simplifying/unifying > >>>>>> "create > >>>>>>>>>>>>>>>>>> [source/sink] > >>>>>>>>>>>>>>>>>>> table" to "create table"? "Watermark definition" and > >>>>>> "create > >>>>>>>>>>> table" > >>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> major obstacles on the way to merge two design > >> proposals > >>>>>>> FMPOV. > >>>>>>>>>>>>>>> @Shuyi, > >>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>> would be great if you can spend time and respond to > >> these > >>>>>> two > >>>>>>>>>>> parts > >>>>>>>>>>>>>>>>>> first. > >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu < > >>> [email protected]> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> Hi Shuyi, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that > >> Lin > >>>>>>> and I > >>>>>>>>>>>>>>>>> drafted. > >>>>>>>>>>>>>>>>>>>> This doc covers all the features running in Alibaba. > >>>>>>>>>>>>>>>>>>>> But some of features might be not needed in the first > >>>>>>> version > >>>>>>>>>> of > >>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>> SQL > >>>>>>>>>>>>>>>>>>>> DDL. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and > >>>>>> reach > >>>>>>>>>>>>>>>>> agreement > >>>>>>>>>>>>>>>>>>> ASAP > >>>>>>>>>>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2] > >> Shuyi > >>>>>>>>>>> proposed. > >>>>>>>>>>>>>>>>>>>> And we can discuss on the main differences one by one. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The following is the MVP DDLs should be included in > >> the > >>>>>>> first > >>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>> opinion (feedbacks are welcome): > >>>>>>>>>>>>>>>>>>>> (1) Table DDL: > >>>>>>>>>>>>>>>>>>>> (1.1) Type definition > >>>>>>>>>>>>>>>>>>>> (1.2) computed column definition > >>>>>>>>>>>>>>>>>>>> (1.3) watermark definition > >>>>>>>>>>>>>>>>>>>> (1.4) with properties > >>>>>>>>>>>>>>>>>>>> (1.5) table constraint (primary key/unique) > >>>>>>>>>>>>>>>>>>>> (1.6) column nullability (nice to have) > >>>>>>>>>>>>>>>>>>>> (2) View DDL > >>>>>>>>>>>>>>>>>>>> (3) Function DDL > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The main differences from two DDL docs (sth maybe > >>> missed, > >>>>>>>>>>> welcome > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> point > >>>>>>>>>>>>>>>>>>>> out): > >>>>>>>>>>>>>>>>>>>> *(1.3) watermark*: this is the main and the most > >>> important > >>>>>>>>>>>>>>>>> difference, > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>> would be great if @Timo Walther <[email protected]> > >>>>>>> @Fabian > >>>>>>>>>>>>>> Hueske > >>>>>>>>>>>>>>>>>>>> <[email protected]> give some feedbacks. > >>>>>>>>>>>>>>>>>>>> (1.1) Type definition: > >>>>>>>>>>>>>>>>>>>> (a) Should VARCHAR carry a length, e.g. > >>>>>>> VARCHAR(128) > >>>>>>>> ? > >>>>>>>>>>>>>>>>>>>> In most cases, the varchar length is > >> not > >>>>>> used > >>>>>>>>>>> because > >>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> stored as String in Flink. But it can be used to > >>> optimize > >>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>> we know the column is a fixed length VARCHAR. > >>>>>>>>>>>>>>>>>>>> So IMO, we can support VARCHAR with > >>> length > >>>>>> in > >>>>>>>> the > >>>>>>>>>>>>>> future, > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> just VARCHAR in this version. > >>>>>>>>>>>>>>>>>>>> (b) Should DECIMAL support custom scale and > >>>>>>>> precision, > >>>>>>>>>>>> e.g. > >>>>>>>>>>>>>>>>>>>> DECIMAL(12, 5)? > >>>>>>>>>>>>>>>>>>>> If we clearly know the scale and > >>> precision > >>>>>> of > >>>>>>>> the > >>>>>>>>>>>>>>> Decimal, > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> can have some optimization on > >>>>>> serialization/deserialization. > >>>>>>>>>>> IMO, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>> support just support DECIMAL in this version, > >>>>>>>>>>>>>>>>>>>> which means DECIMAL(38, 18) as > default. > >>> And > >>>>>>>>>> support > >>>>>>>>>>>>>>> custom > >>>>>>>>>>>>>>>>>>> scale > >>>>>>>>>>>>>>>>>>>> and precision in the future. > >>>>>>>>>>>>>>>>>>>> (2) View DDL: Do we need WITH properties in View > >>> DDL > >>>>>>>>>>> (proposed > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> doc[2])? > >>>>>>>>>>>>>>>>>>>> What are the properties on the view used for? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The features could be supported and discussed in the > >>>>>> future: > >>>>>>>>>>>>>>>>>>>> (1) period definition on table > >>>>>>>>>>>>>>>>>>>> (2) Type DDL > >>>>>>>>>>>>>>>>>>>> (3) Index DDL > >>>>>>>>>>>>>>>>>>>> (4) Library DDL > >>>>>>>>>>>>>>>>>>>> (5) Drop statement > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> [1] Flink DDL draft by Lin and Jark: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit# > >>>>>>>>>>>>>>>>>>>> [2] Flink SQL DDL design by Shuyi: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit# > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang < > >>>>>>>>>>> [email protected]> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> Sure Shuyu, > >>>>>>>>>>>>>>>>>>>>> What I hope is that we can reach an agreement on DDL > >>>>>> gramma > >>>>>>>>>> as > >>>>>>>>>>>>>> soon > >>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>> possible. There are a few differences between your > >>>>>> proposal > >>>>>>>>>> and > >>>>>>>>>>>>>>>>> ours. > >>>>>>>>>>>>>>>>>>>> Once > >>>>>>>>>>>>>>>>>>>>> Lin and Jark propose our design, we can quickly > >> discuss > >>>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>>>>>>> differences, and see how far away towards a unified > >>>>>> design. > >>>>>>>>>>>>>>>>>>>>> WRT the external catalog, I think it is an orthogonal > >>>>>>> topic, > >>>>>>>>>> we > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>> it in parallel. I believe @Xuefu, @Bowen are already > >>>>>>> working > >>>>>>>>>>> on. > >>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>> should/will definitely involve them to review the > >> final > >>>>>>>>>> design > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>> implementation. I would suggest that we should give > >> it > >>> a > >>>>>>>>>> higher > >>>>>>>>>>>>>>>>>>> priority > >>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>> the DDL implementation, as it is a crucial component > >>> for > >>>>>>> the > >>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>>> experience of SQL_CLI. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen < > >>>>>>>>>> [email protected] > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Shaoxuan, Jack and Lin. We should > >>>>>> definitely > >>>>>>>>>>>>>>>>>>> collaborate > >>>>>>>>>>>>>>>>>>>>>> here, we have also our own DDL implementation > >> running > >>> in > >>>>>>>>>>>>>>>>> production > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>> almost 2 years at Uber. With the joint experience > >> from > >>>>>>> both > >>>>>>>>>>>>>>>>>>> companies, > >>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>> can definitely make the Flink SQL DDL better. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> As @shaoxuan suggest, Jark can come up with a doc > >> that > >>>>>>> talks > >>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> current DDL design in Alibaba, and we can discuss > >> and > >>>>>>> merge > >>>>>>>>>>> them > >>>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>>>>> one, > >>>>>>>>>>>>>>>>>>>>>> make it as a FLIP, and plan the tasks for > >>>>>> implementation. > >>>>>>>>>>> Also, > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>> take into account the new external catalog effort in > >>> the > >>>>>>>>>>> design. > >>>>>>>>>>>>>>>>>> What > >>>>>>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>>>>>> you guys think? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Shuyi > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu < > >>>>>> [email protected] > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> Hi Shaoxuan, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I think summarizing it into a google doc is a good > >>>>>> idea. > >>>>>>> We > >>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> prepare > >>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> in the next few days. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Shaoxuan Wang <[email protected]> 于2018年11月28日周三 > >>>>>>>>>> 下午9:17写道: > >>>>>>>>>>>>>>>>>>>>>>>> Hi Lin and Jark, > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for sharing those details. Can you please > >>>>>>> consider > >>>>>>>>>>>>>>>>>>>> summarizing > >>>>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>> DDL design into a google doc. > >>>>>>>>>>>>>>>>>>>>>>>> We can still continue the discussions on Shuyi's > >>>>>>> proposal. > >>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>>>> having a > >>>>>>>>>>>>>>>>>>>>>>>> separate google doc will be easy for the DEV to > >>>>>>>>>>>>>>>>>>>>>>> understand/comment/discuss > >>>>>>>>>>>>>>>>>>>>>>>> on your proposed DDL implementation. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu < > >>>>>>> [email protected] > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Shuyi, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bringing up this discussion and the > >>>>>> awesome > >>>>>>>>>>>>>>>>> work! > >>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>> left > >>>>>>>>>>>>>>>>>>>>>>>>> some comments in the doc. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I want to share something more about the > >> watermark > >>>>>>>>>>>>>>>>> definition > >>>>>>>>>>>>>>>>>>>>> learned > >>>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> Alibaba. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Table should be able to accept multiple > >>>>>> watermark > >>>>>>>>>>>>>>>>>>> definition. > >>>>>>>>>>>>>>>>>>>>>>>>> Because a table may have more than one > >>> rowtime > >>>>>>>>>> field. > >>>>>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>>>>> example, > >>>>>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>>>>> rowtime field is from existing field but > >>>>>> missing > >>>>>>> in > >>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>>>> is the ingestion timestamp in Kafka but > >> not > >>>>>> very > >>>>>>>>>>>>>>>>> accurate. > >>>>>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> case, > >>>>>>>>>>>>>>>>>>>>>>>>> user may define two rowtime fields with > >>>>>>> watermarks > >>>>>>>>>> in > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> Table > >>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> choose > >>>>>>>>>>>>>>>>>>>>>>>>> one in different situation. > >>>>>>>>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Watermark stragety always work with > >> rowtime > >>>>>> field > >>>>>>>>>>>>>>>>>> together. > >>>>>>>>>>>>>>>>>>>>>>>>> Based on the two points metioned above, I think > >> we > >>>>>>> should > >>>>>>>>>>>>>>>>>>> combine > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> watermark strategy and rowtime field selection > >>> (i.e. > >>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>>>>>>>>>>>> used to generate watermark) in one clause, so > >> that > >>> we > >>>>>>> can > >>>>>>>>>>>>>>>>>>> define > >>>>>>>>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>>>>>> watermarks in one Table. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Here I will share the watermark syntax used in > >>>>>> Alibaba > >>>>>>>>>>>>>>>>>> (simply > >>>>>>>>>>>>>>>>>>>>>>> modified): > >>>>>>>>>>>>>>>>>>>>>>>>> watermarkDefinition: > >>>>>>>>>>>>>>>>>>>>>>>>> WATERMARK [watermarkName] FOR <rowtime_field> AS > >>>>>>>>>>>>>>>>> wm_strategy > >>>>>>>>>>>>>>>>>>>>>>>>> wm_strategy: > >>>>>>>>>>>>>>>>>>>>>>>>> BOUNDED WITH OFFSET 'string' timeUnit > >>>>>>>>>>>>>>>>>>>>>>>>> | > >>>>>>>>>>>>>>>>>>>>>>>>> ASCENDING > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> The “WATERMARK” keyword starts a watermark > >>>>>> definition. > >>>>>>>>>> The > >>>>>>>>>>>>>>>>>>> “FOR” > >>>>>>>>>>>>>>>>>>>>>>> keyword > >>>>>>>>>>>>>>>>>>>>>>>>> defines which existing field used to generate > >>>>>>> watermark, > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>> already exist in the schema (we can use > >>>>>> computed-column > >>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> derive > >>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> other fields). The “AS” keyword defines watermark > >>>>>>>>>> strategy, > >>>>>>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>> BOUNDED > >>>>>>>>>>>>>>>>>>>>>>>>> WITH OFFSET (covers almost all the requirements) > >>> and > >>>>>>>>>>>>>>>>>> ASCENDING. > >>>>>>>>>>>>>>>>>>>>>>>>> When the expected rowtime field does not exist in > >>> the > >>>>>>>>>>>>>>>>> schema, > >>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>> computed-column syntax to derive it from other > >>>>>> existing > >>>>>>>>>>>>>>>>>> fields > >>>>>>>>>>>>>>>>>>>>> using > >>>>>>>>>>>>>>>>>>>>>>>>> built-in functions or user defined functions. So > >>> the > >>>>>>>>>>>>>>>>>>>>>> rowtime/watermark > >>>>>>>>>>>>>>>>>>>>>>>>> definition doesn’t need to care about > >>> “field-change” > >>>>>>>>>>>>>>>>> strategy > >>>>>>>>>>>>>>>>>>>>>>>>> (replace/add/from-field). And the proctime field > >>>>>>>>>> definition > >>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> defined using computed-column. Such as pt as > >>>>>> PROCTIME() > >>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> defines a > >>>>>>>>>>>>>>>>>>>>>>>>> proctime field named “pt” in the schema. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to working with you guys! > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>> Jark Wu > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Lin Li <[email protected]> 于2018年11月28日周三 > >>>>>>> 下午6:33写道: > >>>>>>>>>>>>>>>>>>>>>>>>>> @Shuyi > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal! We have a simple DDL > >>>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>>>>>>> (extends > >>>>>>>>>>>>>>>>>>>>>>>>>> Calcite's parser) which been running for almost > >>> two > >>>>>>>>>> years > >>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>> production > >>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> works well. > >>>>>>>>>>>>>>>>>>>>>>>>>> I think the most valued things we'd learned is > >>>>>> keeping > >>>>>>>>>>>>>>>>>>>> simplicity > >>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> standard compliance. > >>>>>>>>>>>>>>>>>>>>>>>>>> Here's the approximate grammar, FYI > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE TABLE > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE TABLE tableName( > >>>>>>>>>>>>>>>>>>>>>>>>>> columnDefinition [, > >> columnDefinition]* > >>>>>>>>>>>>>>>>>>>>>>>>>> [ computedColumnDefinition [, > >>>>>>>>>>>>>>>>>>>> computedColumnDefinition]* > >>>>>>>>>>>>>>>>>>>>> ] > >>>>>>>>>>>>>>>>>>>>>>>>>> [ tableConstraint [, > >>> tableConstraint]* ] > >>>>>>>>>>>>>>>>>>>>>>>>>> [ tableIndex [, tableIndex]* ] > >>>>>>>>>>>>>>>>>>>>>>>>>> [ PERIOD FOR SYSTEM_TIME ] > >>>>>>>>>>>>>>>>>>>>>>>>>> [ WATERMARK watermarkName FOR > >>>>>> rowTimeColumn > >>>>>>>> AS > >>>>>>>>>>>>>>>>>>>>>>>>>> withOffset(rowTimeColumn, offset) ] ) [ > >> WITH ( > >>>>>>>>>>>>>>>>>>> tableOption > >>>>>>>>>>>>>>>>>>>> [ > >>>>>>>>>>>>>>>>>>>>> , > >>>>>>>>>>>>>>>>>>>>>>>>>> tableOption]* ) ] [ ; ] > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> columnDefinition ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> columnName dataType [ NOT NULL ] > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> dataType ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> { > >>>>>>>>>>>>>>>>>>>>>>>>>> [ VARCHAR ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ BOOLEAN ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ TINYINT ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ SMALLINT ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ INT ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ BIGINT ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ FLOAT ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ DECIMAL ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ DOUBLE ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ DATE ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ TIME ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ TIMESTAMP ] > >>>>>>>>>>>>>>>>>>>>>>>>>> | [ VARBINARY ] > >>>>>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> computedColumnDefinition ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> columnName AS > >> computedColumnExpression > >>>>>>>>>>>>>>>>>>>>>>>>>> tableConstraint ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> { PRIMARY KEY | UNIQUE } > >>>>>>>>>>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> tableIndex ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> [ UNIQUE ] INDEX indexName > >>>>>>>>>>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> rowTimeColumn ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> columnName > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> tableOption ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> property=value > >>>>>>>>>>>>>>>>>>>>>>>>>> offset ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> positive integer (unit: ms) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE VIEW > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE VIEW viewName > >>>>>>>>>>>>>>>>>>>>>>>>>> [ > >>>>>>>>>>>>>>>>>>>>>>>>>> ( columnName [, columnName]* ) > >>>>>>>>>>>>>>>>>>>>>>>>>> ] > >>>>>>>>>>>>>>>>>>>>>>>>>> AS queryStatement; > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE FUNCTION > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE FUNCTION functionName > >>>>>>>>>>>>>>>>>>>>>>>>>> AS 'className'; > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> className ::= > >>>>>>>>>>>>>>>>>>>>>>>>>> fully qualified name > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen <[email protected]> 于2018年11月28日周三 > >>>>>>>>>> 上午3:28写道: > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Timo and Xuefu. Yes, I think we > >> can > >>>>>>>>>>>>>>>>>> finalize > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>>>> doc > >>>>>>>>>>>>>>>>>>>>>>>>>>> first and start implementation w/o the unified > >>>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>>>> ready > >>>>>>>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>>>>> skipping some featue. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu, I like the idea of making Flink specific > >>>>>>>>>>>>>>>>>> properties > >>>>>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>>>>>>>> generic > >>>>>>>>>>>>>>>>>>>>>>>>>>> key-value pairs, so that it will make > >> integration > >>>>>>> with > >>>>>>>>>>>>>>>>>> Hive > >>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>> (or > >>>>>>>>>>>>>>>>>>>>>>>>>> others, > >>>>>>>>>>>>>>>>>>>>>>>>>>> e.g. Beam DDL) easier. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I'll run a final pass over the design doc and > >>>>>>> finalize > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> next few days. And we can start creating tasks > >>> and > >>>>>>>>>>>>>>>>>>>> collaborate > >>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> implementation. Thanks a lot for all the > >> comments > >>>>>> and > >>>>>>>>>>>>>>>>>>> inputs. > >>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers! > >>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu < > >>>>>>>>>>>>>>>>>>>>>>>> [email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah! I agree with Timo that DDL can actually > >>>>>>> proceed > >>>>>>>>>>>>>>>>>> w/o > >>>>>>>>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>>>>>>>> blocked > >>>>>>>>>>>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>>>>>> connector API. We can leave the unknown out > >>> while > >>>>>>>>>>>>>>>>>>> defining > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> basic > >>>>>>>>>>>>>>>>>>>>>>>>>>> syntax. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> @Shuyi > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> As commented in the doc, I think we can > >> probably > >>>>>>>>>>>>>>>>> stick > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>> simple > >>>>>>>>>>>>>>>>>>>>>>>>>> syntax > >>>>>>>>>>>>>>>>>>>>>>>>>>>> with general properties, without extending the > >>>>>>> syntax > >>>>>>>>>>>>>>>>>> too > >>>>>>>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>>>> mimics the descriptor API. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Part of our effort on Flink-Hive integration > >> is > >>>>>> also > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>>> syntax > >>>>>>>>>>>>>>>>>>>>>>>>>>>> compatible with Hive's. The one in the current > >>>>>>>>>>>>>>>>> proposal > >>>>>>>>>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>>>>>>>>>> making > >>>>>>>>>>>>>>>>>>>>>>>>>> our > >>>>>>>>>>>>>>>>>>>>>>>>>>>> effort more challenging. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> We can help and collaborate. At this moment, I > >>>>>> think > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>> finalize > >>>>>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the proposal and then we can divide the tasks > >>> for > >>>>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>>>>>>>>> collaboration. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know if there are any questions > >>> or > >>>>>>>>>>>>>>>>>>>>> suggestions. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> ------------------------------------------------------------------ > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sender:Timo Walther <[email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sent at:2018 Nov 27 (Tue) 16:21 > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Recipient:dev <[email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for offering your help here, Xuefu. It > >>>>>> would > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> great > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> move > >>>>>>>>>>>>>>>>>>>>>>>>>>>> these efforts forward. I agree that the DDL is > >>>>>>>>>>>>>>>>> somehow > >>>>>>>>>>>>>>>>>>>>> releated > >>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> unified connector API design but we can also > >>> start > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> basic > >>>>>>>>>>>>>>>>>>>>>>>>>>>> functionality now and evolve the DDL during > >> this > >>>>>>>>>>>>>>>>>> release > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>>>>>>>>>>>> releases. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, we could identify the MVP DDL > >>> syntax > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> skips > >>>>>>>>>>>>>>>>>>>>>>>> defining > >>>>>>>>>>>>>>>>>>>>>>>>>>>> key constraints and maybe even time > >> attributes. > >>>>>> This > >>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>>>>>>>>>> for batch usecases, ETL, and materializing SQL > >>>>>>>>>>>>>>>>> queries > >>>>>>>>>>>>>>>>>>> (no > >>>>>>>>>>>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>>>>>>>>>>>> operations like windows). > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> The unified connector API is high on our > >>> priority > >>>>>>>>>>>>>>>>> list > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> 1.8 > >>>>>>>>>>>>>>>>>>>>>>>>>>>> release. I will try to update the document > >> until > >>>>>> mid > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>>>>>>>> week. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 27.11.18 um 08:08 schrieb Shuyi Chen: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Xuefu. I was busy for some > >> other > >>>>>>>>>>>>>>>>> stuff > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> last 2 > >>>>>>>>>>>>>>>>>>>>>>>>>>>> weeks, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> but we are definitely interested in moving > >> this > >>>>>>>>>>>>>>>>>>> forward. > >>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> unified connector API design [1] is done, we > >>> can > >>>>>>>>>>>>>>>>>>> finalize > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> well and start creating concrete subtasks to > >>>>>>>>>>>>>>>>>>> collaborate > >>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation with the community. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu > >> < > >>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Shuyi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if you folks still have the > >>>>>>>>>>>>>>>>> bandwidth > >>>>>>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We have some dedicated resource and like to > >>> move > >>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> forward. > >>>>>>>>>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> collaborate. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> ------------------------------------------------------------------ > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 发件人:wenlong.lwl<[email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 日 期:2018年11月05日 11:15:35 > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 收件人:<[email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 主 题:Re: [DISCUSS] Flink SQL DDL Design > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Shuyi, thanks for the proposal. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two concerns about the table ddl: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. how about remove the source/sink mark > >> from > >>>>>> the > >>>>>>>>>>>>>>>>>> ddl, > >>>>>>>>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, the framework determine the table > >>>>>>>>>>>>>>>>>> referred > >>>>>>>>>>>>>>>>>>>> is a > >>>>>>>>>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>>>>>>> or a > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sink > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> according to the context of the query using > >>> the > >>>>>>>>>>>>>>>>>> table. > >>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> convenient for use defining a table which > >> can > >>> be > >>>>>>>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> sink, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and more convenient for catalog to > >> persistent > >>>>>> and > >>>>>>>>>>>>>>>>>>> manage > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> meta > >>>>>>>>>>>>>>>>>>>>>>>>>>> infos. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. how about just keeping one pure string > >> map > >>> as > >>>>>>>>>>>>>>>>>>>>> parameters > >>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>> table, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> create tabe Kafka10SourceTable ( > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intField INTEGER, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stringField VARCHAR(128), > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> longField BIGINT, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rowTimeField TIMESTAMP > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) with ( > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.type = ’kafka’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.property-version = ’1’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.version = ’0.10’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.topic = > >>> ‘test-kafka-topic’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.startup-mode = > >>>>>>>>>>>>>>>>> ‘latest-offset’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.specific-offset = > >>> ‘offset’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.type = 'json' > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.prperties.version=’1’, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.derive-schema = 'true' > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ); > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Because: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. in TableFactory, what user use is a > >> string > >>>>>> map > >>>>>>>>>>>>>>>>>>>>>> properties, > >>>>>>>>>>>>>>>>>>>>>>>>>> defining > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters by string-map can be the closest > >>> way > >>>>>> to > >>>>>>>>>>>>>>>>>>>> mapping > >>>>>>>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. The table descriptor can be extended by > >>> user, > >>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> done > >>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and Json, it means that the parameter keys > >> in > >>>>>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>> format > >>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different in different implementation, we > >> can > >>>>>> not > >>>>>>>>>>>>>>>>>>>> restrict > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> key > >>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specified set, so we need a map in connector > >>>>>> scope > >>>>>>>>>>>>>>>>>>> and a > >>>>>>>>>>>>>>>>>>>>> map > >>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties scope. why not just > >> give > >>>>>>>>>>>>>>>>> user a > >>>>>>>>>>>>>>>>>>>>> single > >>>>>>>>>>>>>>>>>>>>>>> map, > >>>>>>>>>>>>>>>>>>>>>>>>> let > >>>>>>>>>>>>>>>>>>>>>>>>>>>> them > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> put parameters in a format they like, which > >> is > >>>>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> simplest > >>>>>>>>>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement DDL parser. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. whether we can define a format clause or > >>> not, > >>>>>>>>>>>>>>>>>>> depends > >>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the connector, using > >>> different > >>>>>>>>>>>>>>>>>>> clause > >>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misunderstanding that we can combine the > >>>>>>>>>>>>>>>>> connectors > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> arbitrary > >>>>>>>>>>>>>>>>>>>>>>>>>>>> formats, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which may not work actually. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 4 Nov 2018 at 18:25, Dominik > >> Wosiński > >>> < > >>>>>>>>>>>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +1, Thanks for the proposal. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I guess this is a long-awaited change. This > >>> can > >>>>>>>>>>>>>>>>>>> vastly > >>>>>>>>>>>>>>>>>>>>>>> increase > >>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functionalities of the SQL Client as it > >> will > >>> be > >>>>>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>>>> complex > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensions like for example those provided > >> by > >>>>>>>>>>>>>>>>>> Apache > >>>>>>>>>>>>>>>>>>>>>>> Bahir[1]. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dom. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/bahir-flink > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sob., 3 lis 2018 o 17:17 Rong Rong < > >>>>>>>>>>>>>>>>>>>> [email protected]> > >>>>>>>>>>>>>>>>>>>>>>>>>> napisał(a): > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +1. Thanks for putting the proposal > >> together > >>>>>>>>>>>>>>>>>> Shuyi. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL has been brought up in a couple of > >> times > >>>>>>>>>>>>>>>>>>>> previously > >>>>>>>>>>>>>>>>>>>>>>> [1,2]. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Utilizing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL will definitely be a great extension > >> to > >>>>>> the > >>>>>>>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>>> SQL > >>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> systematically support some of the > >>> previously > >>>>>>>>>>>>>>>>>>> brought > >>>>>>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>>>>>>> features > >>>>>>>>>>>>>>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]. And it will also be beneficial to see > >>> the > >>>>>>>>>>>>>>>>>>>> document > >>>>>>>>>>>>>>>>>>>>>>>> closely > >>>>>>>>>>>>>>>>>>>>>>>>>>>> aligned > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the previous discussion for unified > >> SQL > >>>>>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>>>>>> [4]. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left a few comments on the doc. > >>> Looking > >>>>>>>>>>>>>>>>>>> forward > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> alignment > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the other couple of efforts and > >>>>>>>>>>>>>>>>> contributing > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> them! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Rong > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3] > >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-8003 > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [4] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%[email protected]%3E > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li < > >>>>>>>>>>>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Shuyi! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I left some comments there. I think the > >>>>>> design > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> SQL > >>>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink-Hive > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration/External catalog enhancements > >>>>>> will > >>>>>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>>>>>>>> closely > >>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>> each > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other. Hope we are well aligned on the > >>>>>>>>>>>>>>>>> directions > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> two > >>>>>>>>>>>>>>>>>>>>>>>>>>> designs, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> look forward to working with you guys on > >>>>>> both! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bowen > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi > >> Chen > >>> < > >>>>>>>>>>>>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL DDL support has been a long-time ask > >>>>>> from > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> community. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL support only DML (e.g. SELECT and > >>> INSERT > >>>>>>>>>>>>>>>>>>>>>> statements). > >>>>>>>>>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> form, Flink SQL users still need to > >>>>>>>>>>>>>>>>>> define/create > >>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>> sources > >>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sinks > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> programmatically in Java/Scala. Also, in > >>> SQL > >>>>>>>>>>>>>>>>>>> Client, > >>>>>>>>>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current implementation does not > >> allow > >>>>>>>>>>>>>>>>>>> dynamical > >>>>>>>>>>>>>>>>>>>>>>> creation > >>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or functions with SQL, this adds > >> friction > >>>>>> for > >>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>> adoption. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drafted a design doc [1] with a few > >>> other > >>>>>>>>>>>>>>>>>>>> community > >>>>>>>>>>>>>>>>>>>>>>>> members > >>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposes > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the design and implementation for adding > >>> DDL > >>>>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> Flink. > >>>>>>>>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design considers DDL for table, view, > >>> type, > >>>>>>>>>>>>>>>>>>> library > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> function. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be great to get feedback on the design > >>> from > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> community, > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> align > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latest effort in unified SQL connector > >> API > >>>>>> [2] > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>> Hive > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly appreciated. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will > >>>>>>>>>>>>>>>>> somehow > >>>>>>>>>>>>>>>>>>>>> connect > >>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> future." > >>>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will > >> somehow > >>>>>>>>>>>>>>>>> connect > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>>> future." > >>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow > >>> connect > >>>>>>> in > >>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>> future." > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow connect > >> in > >>>>>>> your > >>>>>>>>>>>>>>> future." > >>>>>>>>>>>>>>> > >>>>>>> -- > >>>>>>> "So you have to trust that the dots will somehow connect in your > >>>> future." > >>>> > >>> -- > >>> "So you have to trust that the dots will somehow connect in your > future." > >>> > >
