Hi, Feng Thanks for driving this FLIP, very impressive feature that users want, I’ve some quick questions here.
1.Unification SQL: The snapshot concept exists both in Batch mode and Streaming mode, could we consider a unified proposal? I think users won’t another SQL syntax named Time travel for Streaming mode. 2.Semantics: Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a long timestamp value (getTable(ObjectPath tablePath, long timestamp)) we need two information i.e. a TIMESTAMP value and current session timezone, how we deal the value with current proposed SQL syntax. 3. Is it enough using sinlge timestamp to track a snapshot(version) of external table? Some external systems may use timestamp value to mark a version, but others may use version number、file position、log offset. Best, Leonard > On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote: > > Hi Feng, > > I think this FLIP would provide one important feature to unify the stream-SQL > and batch-SQL when we backfill the historical data in batch mode. > > For the "Syntax" session, I think you could add descriptions of how to align > backfill time travel with querying the latest data. And I think you should > also update the "Discussion thread" in the original FLIP. > > Moreover, I have a question about getting the table schema from the catalog. > I'm not sure whether the Catalog#getTable(tablePath, timestamp) will be > called only once. If we have a backfill query between 2023-05-29 and > 2023-06-04 in the past week, and the table schema changed on 2023-06-01, will > the query below detect the schema changes during backfill the whole week? > > SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN '2023-05-29 > 00:00:00' AND '2023-06-05 00:00:00' > > Best > Yun Tang > > > ________________________________ > From: Shammon FY <zjur...@gmail.com> > Sent: Thursday, June 1, 2023 17:57 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > Hi Feng, > > I have one minor comment about the public interface `Optional<Long> > getSnapshot()` in the `CatalogTable`. > > As we can get tables from the new method `Catalog.getTable(ObjectPath > tablePath, long timestamp)`, I think the returned `CatalogBaseTable` will > have the information of timestamp. Flink or connector such as > iceberg/paimon can create sources from the `CatalogBaseTable` directly > without the need to get the snapshot ID from `CatalogTable.getSnapshot()`. > What do you think of it? > > Best, > Shammon FY > > > On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid> wrote: > >> Hi Feng, >> >> Thanks for the proposal! Very interesting feature. Would you like to update >> your thoughts described in your previous email about why SupportsTimeTravel >> has been rejected into the FLIP? This will help readers understand the >> context (in the future). >> >> Since we always directly add overload methods into Catalog according to new >> requirements, which makes the interface bloated. Just out of curiosity, >> does it make sense to introduce some DSL design? Like >> Catalog.getTable(tablePath).on(timeStamp), >> Catalog.getTable(tablePath).current() for the most current version, and >> more room for further extension like timestamp range, etc. I haven't read >> all the source code yet and I'm not sure if it is possible. But a >> design like this will keep the Catalog API lean and the API/DSL will be >> self described and easier to use. >> >> Best regards, >> Jing >> >> >> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski < >> krzysiek.chmielew...@gmail.com> wrote: >> >>> Ok after second though I'm retracting my previous statement about Catalog >>> changes you proposed. >>> I do see a benefit for Delta connector actually with this change and see >>> why this could be coupled with Catalog. >>> >>> Delta Connector SQL support, also ships a Delta Catalog implementation >> for >>> Flink. >>> For Delta Catalog, table schema information is fetched from underlying >>> _delta_log and not stored in metastore. For time travel we actually had a >>> problem, that if we would like to timetravel back to some old version, >>> where schema was slightly different, then we would have a conflict since >>> Catalog would return current schema and not how it was for version X. >>> >>> With your change, our Delta Catalog can actually fetch schema for >> version X >>> and send it to DeltaTableFactory. Currency, Catalog can fetch only >> current >>> version. What we would also need however is version (number/timestamp) >> for >>> this table passed to DynamicTableFactory so we could properly set Delta >>> standalone library. >>> >>> Regards, >>> Krzysztof >>> >>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < >>> krzysiek.chmielew...@gmail.com> napisał(a): >>> >>>> Hi, >>>> happy to see such a feature. >>>> Small note from my end regarding Catalog changes. >>>> >>>> TL;DR >>>> I don't think it is necessary to delegate this feature to the catalog. >> I >>>> think that since "timetravel" is per job/query property, its should not >>> be >>>> coupled with the Catalog or table definition. In my opinion this is >>>> something that DynamicTableFactory only has to know about. I would >> rather >>>> see this feature as it is - SQL syntax enhancement but delegate clearly >>> to >>>> DynamicTableFactory. >>>> >>>> I've implemented timetravel feature for Delta Connector [1] using >>>> current Flink API. >>>> Docs are pending code review, but you can find them here [2] and >> examples >>>> are available here [3] >>>> >>>> The timetravel feature that I've implemented is based on Flink Query >>>> hints. >>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" >>>> >>>> The " versionAsOf" (we also have 'timestampAsOf') parameter is handled >>> not >>>> by Catalog but by DyntamicTableFactory implementation for Delta >>> connector. >>>> The value of this property is passed to Delta standalone lib API that >>>> returns table view for given version. >>>> >>>> I'm not sure how/if proposed change could benefit Delta connector >>>> implementation for this feature. >>>> >>>> Thanks, >>>> Krzysztof >>>> >>>> [1] >>>> >>> >> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink >>>> [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs >>>> [3] >>>> >>> >> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql >>>> >>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a): >>>> >>>>> Hi, Feng >>>>> >>>>> Thanks for driving this FLIP, Time travel is very useful for Flink >>>>> integrate with data lake system. I have one question why the >>>>> implementation >>>>> of TimeTravel is delegated to Catalog? Assuming that we use Flink to >>> query >>>>> Hudi table with the time travel syntax, but we don't use the >>> HudiCatalog, >>>>> instead, we register the hudi table to InMemoryCatalog, can we >> support >>>>> time travel for Hudi table in this case? >>>>> In contrast, I think time travel should bind to connector instead of >>>>> Catalog, so the rejected alternative should be considered. >>>>> >>>>> Best, >>>>> Ron >>>>> >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: >>>>> >>>>>> Hi, Feng. >>>>>> Notice this FLIP only support batch mode for time travel. Would it >>> also >>>>>> make sense to support stream mode to a read a snapshot of the table >>> as a >>>>>> bounded stream? >>>>>> >>>>>> Best regards, >>>>>> Yuxia >>>>>> >>>>>> ----- 原始邮件 ----- >>>>>> 发件人: "Benchao Li" <libenc...@apache.org> >>>>>> 收件人: "dev" <dev@flink.apache.org> >>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 >>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode >>>>>> >>>>>> # Can Calcite support this syntax ` VERSION AS OF` ? >>>>>> >>>>>> This also depends on whether this is defined in standard or any >> known >>>>>> databases that have implemented this. If not, it would be hard to >> push >>>>> it >>>>>> to Calcite. >>>>>> >>>>>> # getTable(ObjectPath object, long timestamp) >>>>>> >>>>>> Then we again come to the problem of "casting between timestamp and >>>>>> numeric", which has been disabled in FLINK-21978[1]. If you're gonna >>> use >>>>>> this, then we need to clarify that problem first. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978 >>>>>> >>>>>> >>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: >>>>>> >>>>>>> hi, thanks for your reply. >>>>>>> >>>>>>> @Benchao >>>>>>>> did you consider the pushdown abilities compatible >>>>>>> >>>>>>> In the current design, the implementation of TimeTravel is >> delegated >>>>> to >>>>>>> Catalog. We have added a function called getTable(ObjectPath >>>>> tablePath, >>>>>>> long timestamp) to obtain the corresponding CatalogBaseTable at a >>>>>> specific >>>>>>> time. Therefore, I think it will not have any impact on the >>> original >>>>>>> pushdown abilities. >>>>>>> >>>>>>> >>>>>>>> I see there is a rejected design for adding >> SupportsTimeTravel, >>>>> but >>>>>> I >>>>>>> didn't see the alternative in the FLIP doc >>>>>>> >>>>>>> Sorry, the document description is not very clear. Regarding >>> whether >>>>> to >>>>>>> support SupportTimeTravel, I have discussed it with yuxia. Since >> we >>>>> have >>>>>>> already passed the corresponding time in getTable(ObjectPath, long >>>>>>> timestamp) of Catalog, SupportTimeTravel may not be necessary. >>>>>>> >>>>>>> In getTable(ObjectPath object, long timestamp), we can obtain the >>>>> schema >>>>>> of >>>>>>> the corresponding time point and put the SNAPSHOT that needs to be >>>>>> consumed >>>>>>> into options. >>>>>>> >>>>>>> >>>>>>> @Shammon >>>>>>>> Could we support this in Flink too? >>>>>>> >>>>>>> I personally think it's possible, but limited by Calcite's syntax >>>>>>> restrictions. I believe we should first support this syntax in >>>>> Calcite. >>>>>>> Currently, I think it may not be easy to support this syntax in >>>>> Flink's >>>>>>> parser. @Benchao, what do you think? Can Calcite support this >> syntax >>>>>>> ` VERSION AS OF` ? >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Feng. >>>>>>> >>>>>>> >>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com> >>> wrote: >>>>>>> >>>>>>>> Thanks Feng, the feature of time travel sounds great! >>>>>>>> >>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon and >> iceberg >>>>>>> support >>>>>>>> snapshot or version. For example, users can query snapshot 1 for >>>>> paimon >>>>>>> by >>>>>>>> the following statement >>>>>>>> SELECT * FROM t VERSION AS OF 1 >>>>>>>> >>>>>>>> Could we support this in Flink too? >>>>>>>> >>>>>>>> Best, >>>>>>>> Shammon FY >>>>>>>> >>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li < >> libenc...@apache.org> >>>>>> wrote: >>>>>>>> >>>>>>>>> Regarding the implementation, did you consider the pushdown >>>>> abilities >>>>>>>>> compatible, e.g., projection pushdown, filter pushdown, >>> partition >>>>>>>> pushdown. >>>>>>>>> Since `Snapshot` is not handled much in existing rules, I >> have a >>>>>>> concern >>>>>>>>> about this. Of course, it depends on your implementation >> detail, >>>>> what >>>>>>> is >>>>>>>>> important is that we'd better add some cross tests for these. >>>>>>>>> >>>>>>>>> Regarding the interface exposed to Connector, I see there is a >>>>>> rejected >>>>>>>>> design for adding SupportsTimeTravel, but I didn't see the >>>>>> alternative >>>>>>> in >>>>>>>>> the FLIP doc. IMO, this is an important thing we need to >> clarify >>>>>>> because >>>>>>>> we >>>>>>>>> need to know whether the Connector supports this, and what >>>>>>>> column/metadata >>>>>>>>> corresponds to 'system_time'. >>>>>>>>> >>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: >>>>>>>>> >>>>>>>>>> Thanks for your reply >>>>>>>>>> >>>>>>>>>> @Timo @BenChao @yuxia >>>>>>>>>> >>>>>>>>>> Sorry for the mistake, Currently , calcite only supports >>> `FOR >>>>>>>>> SYSTEM_TIME >>>>>>>>>> AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS >> OF` >>> . >>>>>> I've >>>>>>>>>> updated the syntax part of the FLIP. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Timo >>>>>>>>>> >>>>>>>>>>> We will convert it to TIMESTAMP_LTZ? >>>>>>>>>> >>>>>>>>>> Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ >> and >>>>> then >>>>>>>>> convert >>>>>>>>>> it into a long value. >>>>>>>>>> >>>>>>>>>>> How do we want to query the most recent version of a table >>>>>>>>>> >>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does >>> cause >>>>>>>>>> inconsistency with the real-time concept. >>>>>>>>>> However, from my personal understanding, the scope of `AS >> OF >>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table >> record. >>>>> So, >>>>>> I >>>>>>>>> think >>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?. >>>>>>>>>> Additionally, if no version is specified, the latest version >>>>> should >>>>>>> be >>>>>>>>> used >>>>>>>>>> by default. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Feng >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia < >>>>> luoyu...@alumni.sjtu.edu.cn >>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to >>> introduce >>>>>> time >>>>>>>>> travel >>>>>>>>>>> to Flink to have a better integration with external data >>>>> soruces. >>>>>>>>>>> >>>>>>>>>>> I also share same concern about the syntax. >>>>>>>>>>> I see in the part of `Whether to support other syntax >>>>>>>> implementations` >>>>>>>>> in >>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR >>>>> SYSTEM_TIME >>>>>>> AS >>>>>>>>> OF`, >>>>>>>>>>> right? >>>>>>>>>>> But the the syntax part in this FLIP, it seems to be `AS >> OF >>>>>>>> TIMESTAMP` >>>>>>>>>>> instead of `FOR SYSTEM_TIME AS OF`. Is it just a mistake >> or >>>>> by >>>>>>>> design? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Yuxia >>>>>>>>>>> >>>>>>>>>>> ----- 原始邮件 ----- >>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org> >>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org> >>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch >>> Mode >>>>>>>>>>> >>>>>>>>>>> Thanks Feng, it's exciting to have this ability. >>>>>>>>>>> >>>>>>>>>>> Regarding the syntax section, are you proposing `AS OF` >>>>> instead >>>>>> of >>>>>>>> `FOR >>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in >>> the >>>>> SQL >>>>>>>>>> standard >>>>>>>>>>> and has been supported in some database vendors such as >> SQL >>>>>> Server. >>>>>>>>> About >>>>>>>>>>> `AS OF`, is it in the standard or any database vendor >>> supports >>>>>>> this, >>>>>>>> if >>>>>>>>>>> yes, I think it's worth to add this support to Calcite, >> and >>> I >>>>>> would >>>>>>>>> give >>>>>>>>>> a >>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better to >> use >>>>> `FOR >>>>>>>> SYSTEM >>>>>>>>>> AS >>>>>>>>>>> OF`. >>>>>>>>>>> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道: >>>>>>>>>>> >>>>>>>>>>>> Also: How do we want to query the most recent version >> of a >>>>>> table? >>>>>>>>>>>> >>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but according >> to >>>>> the >>>>>>> docs >>>>>>>>>> both >>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more >> concerning >>>>> is >>>>>> the >>>>>>>> it >>>>>>>>>>>> actually is evalated row-based: >>>>>>>>>>>> >>>>>>>>>>>>> Returns the current SQL timestamp in the local time >>> zone, >>>>>> the >>>>>>>>> return >>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each >> record >>> in >>>>>>>>> streaming >>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the >> query >>>>>> starts >>>>>>>> and >>>>>>>>>>>> uses the same result for every row. >>>>>>>>>>>> >>>>>>>>>>>> This could make it difficult to explain in a join >> scenario >>>>> of >>>>>>>>> multiple >>>>>>>>>>>> snapshotted tables. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Timo >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote: >>>>>>>>>>>>> Hi Feng, >>>>>>>>>>>>> >>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of >> sense >>> to >>>>>>>> finally >>>>>>>>>>>>> support querying tables at a specific point in time or >>>>>>> hopefully >>>>>>>>> also >>>>>>>>>>>>> ranges soon. Following time-versioned tables. >>>>>>>>>>>>> >>>>>>>>>>>>> Here is some feedback from my side: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Syntax >>>>>>>>>>>>> >>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions? >>>>>>>>>>>>> >>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for this >>> but >>>>>> not >>>>>>>> `FOR >>>>>>>>>>>>> SYSTEM_TIME AS OF`? >>>>>>>>>>>>> >>>>>>>>>>>>> It would be great to support `AS OF` also for >>>>> time-versioned >>>>>>>> joins >>>>>>>>>> and >>>>>>>>>>>>> have a unified and short syntax. >>>>>>>>>>>>> >>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can make >>> this >>>>>>>>> available >>>>>>>>>> in >>>>>>>>>>>>> Flink earlier by copying the corresponding classes >> until >>>>> the >>>>>>> next >>>>>>>>>>>>> Calcite upgrade is performed. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. Semantics >>>>>>>>>>>>> >>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have 2 >>>>>> timestamp >>>>>>>>> types >>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF >>>>>> TIMESTAMP >>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the >>>>> timestamp >>>>>> be? >>>>>>>> We >>>>>>>>>> will >>>>>>>>>>>>> convert it to TIMESTAMP_LTZ? >>>>>>>>>>>>> >>>>>>>>>>>>> We definely need to clarify this because the past has >>>>> shown >>>>>>> that >>>>>>>>>>>>> daylight saving times make our lives hard. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Timo >>>>>>>>>>>>> >>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote: >>>>>>>>>>>>>> Hi, everyone. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308: >> Support >>>>> Time >>>>>>>> Travel >>>>>>>>>> In >>>>>>>>>>>>>> Batch >>>>>>>>>>>>>> Mode [1] >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Time travel is a SQL syntax used to query historical >>>>>> versions >>>>>>> of >>>>>>>>>> data. >>>>>>>>>>>> It >>>>>>>>>>>>>> allows users to specify a point in time and retrieve >>> the >>>>>> data >>>>>>>> and >>>>>>>>>>>>>> schema of >>>>>>>>>>>>>> a table as it appeared at that time. With time >> travel, >>>>> users >>>>>>> can >>>>>>>>>>> easily >>>>>>>>>>>>>> analyze and compare historical versions of data. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> With the widespread use of data lake systems such as >>>>> Paimon, >>>>>>>>>> Iceberg, >>>>>>>>>>>> and >>>>>>>>>>>>>> Hudi, time travel can provide more convenience for >>> users' >>>>>> data >>>>>>>>>>> analysis. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Looking forward to your opinions, any suggestions are >>>>>>> welcomed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Feng >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Benchao Li >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Benchao Li >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Best, >>>>>> Benchao Li >>>>>> >>>>> >>>> >>> >>