Thanks Leonard for the input, "Implicitly type conversion" way sounds good to me. I also agree that this should be done in planner instead of connector, it'll be a lot easier for connector development.
Leonard Xu <xbjt...@gmail.com> 于2023年6月9日周五 20:11写道: > About the semantics consideration, I have some new input after rethink. > > 1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following > the syntax `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME AS > OF ` > > 2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is > pretty intuitive, for TIMESTAMP_type, it will be implied cast to > TIMESTAMP_LTZ type by planner using session timezone and then pass to > CatalogTable. This case can be considered as a Function AsOfSnapshot(Table > t, TIMESTAMP_LTZ arg), which can pass arg with TIMESTAMP_LTZ type, but our > framework supports implicit type conversion thus users can also pass arg > with TIMESTAMP type. Hint, Spark[1] did the implicit type conversion too. > > 3.I also considered handing over the implicit type conversion to the > connector instead of planner, such as passing a TIMESTAMP literal, and the > connector using the session timezone to perform type conversion, but this > is more complicated than previous planner handling, and it’s not friendly > to the connector developers. > > 4. The last point, TIMESTAMP_LTZ '1970-01-01 00:00:04.001’ should be an > invalid expression as if you can not define a instant point (i.e > TIMSTAMP_LTZ semantics in SQL) from a timestamp literal without timezone. > You can use explicit type conversion like `cast(ts_ntz as TIMESTAMP_LTZ)` > after `FOR SYSTEM_TIME AS OF ` if you want to use > Timestamp type/expression/literal without timezone. > > 5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST > time[2] well that will help user avoid many corner case. > > > Best, > Leonard > > [1] > https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56 > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support > > > > > > On Jun 9, 2023, at 1:13 PM, Benchao Li <libenc...@apache.org> wrote: > > > > As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's > > where the time zone happens. > > > > What I'm talking about is casting timestamp/timestamp_ltz to long > directly, > > that's why the semantic is tricky when you are casting timestamp to long > > using time zone. > > > > For other systems, such as SQL server[1], they actually uses a string > > instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01 > > 00:00:00.0000000'`, I'm not sure whether they convert the string > implicitly > > to TIMESTAMP_LTZ, or they just have a different definition of the syntax. > > > > But for us, we are definitely using timestamp/timestmap_ltz literal here, > > that's why it is special, and we must highlight this behavior that we are > > converting a timestamp without time zone literal to long using the > session > > time zone. > > > > [1] > > > https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16 > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年6月8日周四 11:35写道: > > > >> Hi all, > >> > >> thanks for your input > >> > >> > >> @Benchao > >> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP > >> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC > timezone, > >> which is not usually expected by users. > >> > >> It was indeed the case before Flink 1.13, but now my understanding is > that > >> there have been some slight changes in the definition of TIMESTAMP. > >> > >> TIMESTAMP is currently used to specify the year, month, day, hour, > minute > >> and second. We recommend that users use > *UNIX_TIMESTAMP(CAST(timestamp_col > >> AS STRING))* to convert *TIMESTAMP values* and *long values*. The > >> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore, > >> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will > involve > >> using the *LOCAL TIME ZONE*. > >> > >> > >> Here is an test: > >> > >> Flink SQL> SET 'table.local-time-zone' = 'UTC'; > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as > >> STRING)) as `timestamp`; > >> --------------- > >> timestamp > >> -------------- > >> 0 > >> > >> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as > >> STRING)) as `timestamp`; > >> --------------- > >> timestamp > >> -------------- > >> -28800 > >> > >> Therefore, the current conversion method exposed to users is also using > >> LOCAL TIME ZONE. > >> > >> > >> @yuxia > >> > >> Thank you very much for providing the list of behaviors of TIMESTAMP in > >> other systems. > >> > >>> I think we can align them to avoid the inconsistency to other engines > and > >> provide convenience for the external connectors while integrating > Flink's > >> time travel API. > >> > >> +1 for this. > >> > >>> Regarding the inconsistency, I think we can consider time-travel as a > >> specical case, and we do needs to highlight this in this FLIP. > >> As for "violate the restriction outlined in FLINK-21978[1]", since we > cast > >> timestamp to epochMillis only for the internal use, and won't expose it > to > >> users, I don't think it will violate the restriction. > >> Btw, please add a brief desc to explain the meaning of the parameter > >> `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, > long > >> timestamp)`. Maybe something like "timestamp of the table snapt, which > is > >> millseconds since 1970-01-01 00:00:00 UTC". > >> > >> Thank you for the suggestions regarding the document. I will add them to > >> FLIP. > >> > >> > >> Best, > >> Feng > >> > >> > >> On Wed, Jun 7, 2023 at 12:18 PM Benchao Li <libenc...@apache.org> > wrote: > >> > >>> I also share the concern about the timezone problem. > >>> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP > >> WITHOUT > >>> TIME ZONE", converting it to unix timestamp would use UTC timezone, > which > >>> is not usually expected by users. > >>> > >>> If we want to keep consistent with the standard, we probably should use > >>> "TIMESTAMP WITH LOCAL ZONE '2023-04-27 00:00:00'", which type is > >> "TIMESTAMP > >>> WITH LOCAL TIME ZONE", and converting it to unix timestamp will > consider > >>> the session timezone, which is the expected result. But it's > inconvenient > >>> for users. > >>> > >>> Taking this a special case, and converting "TIMESTAMP '2023-04-27 > >>> 00:00:00'" to a unix timestamp with session timezone, will be > convenient > >>> for users, but will break the standard. I will +0.5 for this choice. > >>> > >>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年6月7日周三 12:06写道: > >>> > >>>> Hi, Feng Jin. > >>>> I think the concern of Leonard may be the inconsistency of the > behavior > >>> of > >>>> TIMESTAMP '2023-04-27 00:00:00' beween timetravel and other sql > >>> statement. > >>>> > >>>> For the normal sql: > >>>> `SELECT TIMESTAMP '2023-04-27 00:00:00'`, we won't consider timezone. > >>>> But for the sql for timetravl: > >>>> `SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP '2023-04-27 > >>>> 00:00:00'`, we will consider the timezone and convert to UTC > timestamp. > >>>> > >>>> The concern is valid. But for time travel, most style of engines, > >>>> Spark[1], Hive[2], Trino[3] also do the time conversion with > >> considering > >>>> the seesion time zone. I think we can align them to avoid the > >>> inconsistency > >>>> to other engines and provide convenience for the external connectors > >>> while > >>>> integrating Flink's time travel API. > >>>> > >>>> Regarding the inconsistency, I think we can consider time-travel as a > >>>> specical case, and we do needs to highlight this in this FLIP. > >>>> As for "violate the restriction outlined in FLINK-21978[1]", since we > >>> cast > >>>> timestamp to epochMillis only for the internal use, and won't expose > it > >>> to > >>>> users, I don't think it will violate the restriction. > >>>> Btw, please add a brief desc to explain the meaning of the parameter > >>>> `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, > >>> long > >>>> timestamp)`. Maybe something like "timestamp of the table snapt, which > >> is > >>>> millseconds since 1970-01-01 00:00:00 UTC". > >>>> > >>>> [1] > >>>> > >>> > >> > https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56 > >>>> [2] > >>>> > >>> > >> > https://github.com/apache/hive/blob/f5e69dc38d7ff26c70be19adc9d1a3ae90dc4cf2/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L989 > >>>> [3] > >>>> > >>> > >> > https://github.com/trinodb/trino/blob/2433d9e60f1abb0d85c32374c1758525560e1a86/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L443 > >>>> > >>>> > >>>> Best regards, > >>>> Yuxia > >>>> > >>>> ----- 原始邮件 ----- > >>>> 发件人: "Feng Jin" <jinfeng1...@gmail.com> > >>>> 收件人: "dev" <dev@flink.apache.org> > >>>> 发送时间: 星期二, 2023年 6 月 06日 下午 10:15:47 > >>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > >>>> > >>>> Hi everyone > >>>> > >>>> Thanks everyone for your input. > >>>> > >>>> > >>>> @Yun > >>>> > >>>>> I think you could add descriptions of how to align backfill time > >>> travel > >>>> with querying the latest data. And I think you should also update the > >>>> "Discussion thread" in the original FLIP. > >>>> > >>>> Thank you for the suggestion, I will update it in the document. > >>>> > >>>>> I have a question about getting the table schema from the catalog. > >> I'm > >>>> not sure whether the Catalog#getTable(tablePath, timestamp) will be > >>> called > >>>> only once. > >>>> > >>>> I understand that in a query, the schema of the table is determined > >>> before > >>>> execution. The schema used will be based on the latest schema within > >> the > >>>> TimeTravel period. > >>>> > >>>> In addition, due to current syntax limitations, we are unable to > >> support > >>>> the use of BETWEEN AND. > >>>> > >>>> > >>>> @Jing > >>>> > >>>>> Would you like to update your thoughts described in your previous > >>> email > >>>> about why SupportsTimeTravel has been rejected into the FLIP? > >>>> > >>>> Sure, I updated the doc. > >>>> > >>>> > >>>>> Since we always directly add overload methods into Catalog > >> according > >>>> to new requirements, which makes the interface bloated > >>>> > >>>> Your concern is valid. If we need to support the long type version in > >> the > >>>> future, we may have to add another method "getTable(ObjectPath, long > >>>> version)". However, I understand that > >>>> "Catalog.getTable(tablePath).on(timeStamp)" may not meet the > >>> requirements. > >>>> The timestamp is for Catalog's use, and Catalog obtains the > >> corresponding > >>>> schema based on this time. > >>>> > >>>> > >>>> @liu @Regards > >>>> > >>>> I am very sorry for the unclear description in the document. I have > >>> updated > >>>> relevant descriptions regarding why it needs to be implemented in > >>> Catalog. > >>>> > >>>> Travel not only requires obtaining data at the corresponding time > >> point, > >>>> but also requires the corresponding Schema at that time point > >>>> > >>>> > >>>> @Shammon > >>>> > >>>>> Flink or connector such as iceberg/paimon can create sources from > >> the > >>>> `CatalogBaseTable` directly without the need to get the snapshot ID > >> from > >>>> `CatalogTable.getSnapshot()`. What do you think of it? > >>>> > >>>> You are right, we don't need the getSnapshot interface for > >> PaimonCatalog > >>> or > >>>> IcebergCatalog tables, but we may need it for temporary tables. > >>>> > >>>> > >>>> > >>>> Best, > >>>> Feng > >>>> > >>>> > >>>> On Tue, Jun 6, 2023 at 9:32 PM Feng Jin <jinfeng1...@gmail.com> > wrote: > >>>> > >>>>> Sorry I replied to the wrong mail. Please ignore the last email. > >>>>> > >>>>> > >>>>> Hi Leonard > >>>>> > >>>>>> 1. Unification SQL > >>>>> > >>>>> I agree that it is crucial for us to support both batch and streaming > >>>>> processing. The current design allows for the support of both batch > >>> and > >>>>> streaming processing. I'll update the FLIP later. > >>>>> > >>>>> > >>>>>> 2.Semantics > >>>>> > >>>>> In my opinion, it would be feasible to perform the conversion based > >> on > >>>> the > >>>>> current session time, regardless of whether it is TIMESTAMP or > >>>>> TIMESTAMP_LTZ. > >>>>> > >>>>> However, this may indeed violate the restriction outlined in > >>>>> FLINK-21978[1] as Benchao mentioned, and I am uncertain as to > >> whether > >>> it > >>>>> is reasonable. > >>>>> > >>>>> > >>>>>> 3. Some external systems may use timestamp value to mark a > >>> version, > >>>>> but others may use version number、file position、log offset. > >>>>> > >>>>> It is true that most systems support time-related operations, and I > >>>>> believe that the current design is compatible with most systems. > >>> However, > >>>>> if we want to support long data type, it may require Calcite to > >> support > >>>> the > >>>>> VERSION AS OF syntax. I understand that this is something that we may > >>>> need > >>>>> to consider in the future. > >>>>> > >>>>> > >>>>> Best, > >>>>> Feng > >>>>> > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978 > >>>>> > >>>>> On Tue, Jun 6, 2023 at 8:28 PM Leonard Xu <xbjt...@gmail.com> wrote: > >>>>> > >>>>>> Hi, Feng > >>>>>> > >>>>>> Thanks for driving this FLIP, very impressive feature that users > >> want, > >>>>>> I’ve some quick questions here. > >>>>>> > >>>>>> 1.Unification SQL: > >>>>>> The snapshot concept exists both in Batch mode and > >> Streaming > >>>>>> mode, could we consider a unified proposal? I think users won’t > >>> another > >>>>>> SQL syntax named > >>>>>> Time travel for Streaming mode. > >>>>>> > >>>>>> 2.Semantics: > >>>>>> Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a > >>> long > >>>>>> timestamp value (getTable(ObjectPath tablePath, long timestamp)) we > >>> need > >>>>>> two information i.e. a TIMESTAMP value and current session timezone, > >>>> how > >>>>>> we deal the value with current proposed SQL syntax. > >>>>>> > >>>>>> 3. Is it enough using sinlge timestamp to track a snapshot(version) > >> of > >>>>>> external table? Some external systems may use timestamp value to > >>> mark > >>>> a > >>>>>> version, but others may use version number、file position、log offset. > >>>>>> > >>>>>> Best, > >>>>>> Leonard > >>>>>> > >>>>>> > >>>>>> > >>>>>>> On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote: > >>>>>>> > >>>>>>> Hi Feng, > >>>>>>> > >>>>>>> I think this FLIP would provide one important feature to unify the > >>>>>> stream-SQL and batch-SQL when we backfill the historical data in > >> batch > >>>> mode. > >>>>>>> > >>>>>>> For the "Syntax" session, I think you could add descriptions of > >> how > >>> to > >>>>>> align backfill time travel with querying the latest data. And I > >> think > >>>> you > >>>>>> should also update the "Discussion thread" in the original FLIP. > >>>>>>> > >>>>>>> Moreover, I have a question about getting the table schema from > >> the > >>>>>> catalog. I'm not sure whether the Catalog#getTable(tablePath, > >>> timestamp) > >>>>>> will be called only once. If we have a backfill query between > >>> 2023-05-29 > >>>>>> and 2023-06-04 in the past week, and the table schema changed on > >>>>>> 2023-06-01, will the query below detect the schema changes during > >>>> backfill > >>>>>> the whole week? > >>>>>>> > >>>>>>> SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN > >>>>>> '2023-05-29 00:00:00' AND '2023-06-05 00:00:00' > >>>>>>> > >>>>>>> Best > >>>>>>> Yun Tang > >>>>>>> > >>>>>>> > >>>>>>> ________________________________ > >>>>>>> From: Shammon FY <zjur...@gmail.com> > >>>>>>> Sent: Thursday, June 1, 2023 17:57 > >>>>>>> To: dev@flink.apache.org <dev@flink.apache.org> > >>>>>>> Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > >>>>>>> > >>>>>>> Hi Feng, > >>>>>>> > >>>>>>> I have one minor comment about the public interface > >> `Optional<Long> > >>>>>>> getSnapshot()` in the `CatalogTable`. > >>>>>>> > >>>>>>> As we can get tables from the new method > >>> `Catalog.getTable(ObjectPath > >>>>>>> tablePath, long timestamp)`, I think the returned > >> `CatalogBaseTable` > >>>>>> will > >>>>>>> have the information of timestamp. Flink or connector such as > >>>>>>> iceberg/paimon can create sources from the `CatalogBaseTable` > >>> directly > >>>>>>> without the need to get the snapshot ID from > >>>>>> `CatalogTable.getSnapshot()`. > >>>>>>> What do you think of it? > >>>>>>> > >>>>>>> Best, > >>>>>>> Shammon FY > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid > >>> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Feng, > >>>>>>>> > >>>>>>>> Thanks for the proposal! Very interesting feature. Would you like > >>> to > >>>>>> update > >>>>>>>> your thoughts described in your previous email about why > >>>>>> SupportsTimeTravel > >>>>>>>> has been rejected into the FLIP? This will help readers > >> understand > >>>> the > >>>>>>>> context (in the future). > >>>>>>>> > >>>>>>>> Since we always directly add overload methods into Catalog > >>> according > >>>>>> to new > >>>>>>>> requirements, which makes the interface bloated. Just out of > >>>> curiosity, > >>>>>>>> does it make sense to introduce some DSL design? Like > >>>>>>>> Catalog.getTable(tablePath).on(timeStamp), > >>>>>>>> Catalog.getTable(tablePath).current() for the most current > >> version, > >>>> and > >>>>>>>> more room for further extension like timestamp range, etc. I > >>> haven't > >>>>>> read > >>>>>>>> all the source code yet and I'm not sure if it is possible. But a > >>>>>>>> design like this will keep the Catalog API lean and the API/DSL > >>> will > >>>> be > >>>>>>>> self described and easier to use. > >>>>>>>> > >>>>>>>> Best regards, > >>>>>>>> Jing > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski < > >>>>>>>> krzysiek.chmielew...@gmail.com> wrote: > >>>>>>>> > >>>>>>>>> Ok after second though I'm retracting my previous statement > >> about > >>>>>> Catalog > >>>>>>>>> changes you proposed. > >>>>>>>>> I do see a benefit for Delta connector actually with this change > >>> and > >>>>>> see > >>>>>>>>> why this could be coupled with Catalog. > >>>>>>>>> > >>>>>>>>> Delta Connector SQL support, also ships a Delta Catalog > >>>> implementation > >>>>>>>> for > >>>>>>>>> Flink. > >>>>>>>>> For Delta Catalog, table schema information is fetched from > >>>> underlying > >>>>>>>>> _delta_log and not stored in metastore. For time travel we > >>> actually > >>>>>> had a > >>>>>>>>> problem, that if we would like to timetravel back to some old > >>>> version, > >>>>>>>>> where schema was slightly different, then we would have a > >> conflict > >>>>>> since > >>>>>>>>> Catalog would return current schema and not how it was for > >> version > >>>> X. > >>>>>>>>> > >>>>>>>>> With your change, our Delta Catalog can actually fetch schema > >> for > >>>>>>>> version X > >>>>>>>>> and send it to DeltaTableFactory. Currency, Catalog can fetch > >> only > >>>>>>>> current > >>>>>>>>> version. What we would also need however is version > >>>> (number/timestamp) > >>>>>>>> for > >>>>>>>>> this table passed to DynamicTableFactory so we could properly > >> set > >>>>>> Delta > >>>>>>>>> standalone library. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Krzysztof > >>>>>>>>> > >>>>>>>>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < > >>>>>>>>> krzysiek.chmielew...@gmail.com> napisał(a): > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> happy to see such a feature. > >>>>>>>>>> Small note from my end regarding Catalog changes. > >>>>>>>>>> > >>>>>>>>>> TL;DR > >>>>>>>>>> I don't think it is necessary to delegate this feature to the > >>>>>> catalog. > >>>>>>>> I > >>>>>>>>>> think that since "timetravel" is per job/query property, its > >>> should > >>>>>> not > >>>>>>>>> be > >>>>>>>>>> coupled with the Catalog or table definition. In my opinion > >> this > >>> is > >>>>>>>>>> something that DynamicTableFactory only has to know about. I > >>> would > >>>>>>>> rather > >>>>>>>>>> see this feature as it is - SQL syntax enhancement but delegate > >>>>>> clearly > >>>>>>>>> to > >>>>>>>>>> DynamicTableFactory. > >>>>>>>>>> > >>>>>>>>>> I've implemented timetravel feature for Delta Connector [1] > >>> using > >>>>>>>>>> current Flink API. > >>>>>>>>>> Docs are pending code review, but you can find them here [2] > >> and > >>>>>>>> examples > >>>>>>>>>> are available here [3] > >>>>>>>>>> > >>>>>>>>>> The timetravel feature that I've implemented is based on Flink > >>>> Query > >>>>>>>>>> hints. > >>>>>>>>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" > >>>>>>>>>> > >>>>>>>>>> The " versionAsOf" (we also have 'timestampAsOf') parameter is > >>>>>> handled > >>>>>>>>> not > >>>>>>>>>> by Catalog but by DyntamicTableFactory implementation for Delta > >>>>>>>>> connector. > >>>>>>>>>> The value of this property is passed to Delta standalone lib > >> API > >>>> that > >>>>>>>>>> returns table view for given version. > >>>>>>>>>> > >>>>>>>>>> I'm not sure how/if proposed change could benefit Delta > >> connector > >>>>>>>>>> implementation for this feature. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Krzysztof > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink > >>>>>>>>>> [2] > >>>>>> https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs > >>>>>>>>>> [3] > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql > >>>>>>>>>> > >>>>>>>>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> > >>> napisał(a): > >>>>>>>>>> > >>>>>>>>>>> Hi, Feng > >>>>>>>>>>> > >>>>>>>>>>> Thanks for driving this FLIP, Time travel is very useful for > >>> Flink > >>>>>>>>>>> integrate with data lake system. I have one question why the > >>>>>>>>>>> implementation > >>>>>>>>>>> of TimeTravel is delegated to Catalog? Assuming that we use > >>> Flink > >>>> to > >>>>>>>>> query > >>>>>>>>>>> Hudi table with the time travel syntax, but we don't use the > >>>>>>>>> HudiCatalog, > >>>>>>>>>>> instead, we register the hudi table to InMemoryCatalog, can > >> we > >>>>>>>> support > >>>>>>>>>>> time travel for Hudi table in this case? > >>>>>>>>>>> In contrast, I think time travel should bind to connector > >>> instead > >>>> of > >>>>>>>>>>> Catalog, so the rejected alternative should be considered. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Ron > >>>>>>>>>>> > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, Feng. > >>>>>>>>>>>> Notice this FLIP only support batch mode for time travel. > >>> Would > >>>> it > >>>>>>>>> also > >>>>>>>>>>>> make sense to support stream mode to a read a snapshot of the > >>>> table > >>>>>>>>> as a > >>>>>>>>>>>> bounded stream? > >>>>>>>>>>>> > >>>>>>>>>>>> Best regards, > >>>>>>>>>>>> Yuxia > >>>>>>>>>>>> > >>>>>>>>>>>> ----- 原始邮件 ----- > >>>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org> > >>>>>>>>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > >>>>>>>>>>>> > >>>>>>>>>>>> # Can Calcite support this syntax ` VERSION AS OF` ? > >>>>>>>>>>>> > >>>>>>>>>>>> This also depends on whether this is defined in standard or > >> any > >>>>>>>> known > >>>>>>>>>>>> databases that have implemented this. If not, it would be > >> hard > >>> to > >>>>>>>> push > >>>>>>>>>>> it > >>>>>>>>>>>> to Calcite. > >>>>>>>>>>>> > >>>>>>>>>>>> # getTable(ObjectPath object, long timestamp) > >>>>>>>>>>>> > >>>>>>>>>>>> Then we again come to the problem of "casting between > >> timestamp > >>>> and > >>>>>>>>>>>> numeric", which has been disabled in FLINK-21978[1]. If > >> you're > >>>>>> gonna > >>>>>>>>> use > >>>>>>>>>>>> this, then we need to clarify that problem first. > >>>>>>>>>>>> > >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978 > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> hi, thanks for your reply. > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Benchao > >>>>>>>>>>>>>> did you consider the pushdown abilities compatible > >>>>>>>>>>>>> > >>>>>>>>>>>>> In the current design, the implementation of TimeTravel is > >>>>>>>> delegated > >>>>>>>>>>> to > >>>>>>>>>>>>> Catalog. We have added a function called getTable(ObjectPath > >>>>>>>>>>> tablePath, > >>>>>>>>>>>>> long timestamp) to obtain the corresponding CatalogBaseTable > >>> at > >>>> a > >>>>>>>>>>>> specific > >>>>>>>>>>>>> time. Therefore, I think it will not have any impact on the > >>>>>>>>> original > >>>>>>>>>>>>> pushdown abilities. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I see there is a rejected design for adding > >>>>>>>> SupportsTimeTravel, > >>>>>>>>>>> but > >>>>>>>>>>>> I > >>>>>>>>>>>>> didn't see the alternative in the FLIP doc > >>>>>>>>>>>>> > >>>>>>>>>>>>> Sorry, the document description is not very clear. > >> Regarding > >>>>>>>>> whether > >>>>>>>>>>> to > >>>>>>>>>>>>> support SupportTimeTravel, I have discussed it with yuxia. > >>> Since > >>>>>>>> we > >>>>>>>>>>> have > >>>>>>>>>>>>> already passed the corresponding time in > >> getTable(ObjectPath, > >>>> long > >>>>>>>>>>>>> timestamp) of Catalog, SupportTimeTravel may not be > >> necessary. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In getTable(ObjectPath object, long timestamp), we can > >> obtain > >>>> the > >>>>>>>>>>> schema > >>>>>>>>>>>> of > >>>>>>>>>>>>> the corresponding time point and put the SNAPSHOT that needs > >>> to > >>>> be > >>>>>>>>>>>> consumed > >>>>>>>>>>>>> into options. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Shammon > >>>>>>>>>>>>>> Could we support this in Flink too? > >>>>>>>>>>>>> > >>>>>>>>>>>>> I personally think it's possible, but limited by Calcite's > >>>> syntax > >>>>>>>>>>>>> restrictions. I believe we should first support this syntax > >> in > >>>>>>>>>>> Calcite. > >>>>>>>>>>>>> Currently, I think it may not be easy to support this > >> syntax > >>> in > >>>>>>>>>>> Flink's > >>>>>>>>>>>>> parser. @Benchao, what do you think? Can Calcite support > >> this > >>>>>>>> syntax > >>>>>>>>>>>>> ` VERSION AS OF` ? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Feng. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY < > >> zjur...@gmail.com > >>>> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks Feng, the feature of time travel sounds great! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon and > >>>>>>>> iceberg > >>>>>>>>>>>>> support > >>>>>>>>>>>>>> snapshot or version. For example, users can query snapshot > >> 1 > >>>> for > >>>>>>>>>>> paimon > >>>>>>>>>>>>> by > >>>>>>>>>>>>>> the following statement > >>>>>>>>>>>>>> SELECT * FROM t VERSION AS OF 1 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Could we support this in Flink too? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Shammon FY > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li < > >>>>>>>> libenc...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding the implementation, did you consider the > >> pushdown > >>>>>>>>>>> abilities > >>>>>>>>>>>>>>> compatible, e.g., projection pushdown, filter pushdown, > >>>>>>>>> partition > >>>>>>>>>>>>>> pushdown. > >>>>>>>>>>>>>>> Since `Snapshot` is not handled much in existing rules, I > >>>>>>>> have a > >>>>>>>>>>>>> concern > >>>>>>>>>>>>>>> about this. Of course, it depends on your implementation > >>>>>>>> detail, > >>>>>>>>>>> what > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>> important is that we'd better add some cross tests for > >>> these. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding the interface exposed to Connector, I see there > >>> is a > >>>>>>>>>>>> rejected > >>>>>>>>>>>>>>> design for adding SupportsTimeTravel, but I didn't see the > >>>>>>>>>>>> alternative > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>> the FLIP doc. IMO, this is an important thing we need to > >>>>>>>> clarify > >>>>>>>>>>>>> because > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> need to know whether the Connector supports this, and what > >>>>>>>>>>>>>> column/metadata > >>>>>>>>>>>>>>> corresponds to 'system_time'. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for your reply > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @Timo @BenChao @yuxia > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Sorry for the mistake, Currently , calcite only supports > >>>>>>>>> `FOR > >>>>>>>>>>>>>>> SYSTEM_TIME > >>>>>>>>>>>>>>>> AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS > >>>>>>>> OF` > >>>>>>>>> . > >>>>>>>>>>>> I've > >>>>>>>>>>>>>>>> updated the syntax part of the FLIP. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @Timo > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> We will convert it to TIMESTAMP_LTZ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Yes, I think we need to convert TIMESTAMP to > >> TIMESTAMP_LTZ > >>>>>>>> and > >>>>>>>>>>> then > >>>>>>>>>>>>>>> convert > >>>>>>>>>>>>>>>> it into a long value. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> How do we want to query the most recent version of a > >> table > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does > >>>>>>>>> cause > >>>>>>>>>>>>>>>> inconsistency with the real-time concept. > >>>>>>>>>>>>>>>> However, from my personal understanding, the scope of > >> `AS > >>>>>>>> OF > >>>>>>>>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table > >>>>>>>> record. > >>>>>>>>>>> So, > >>>>>>>>>>>> I > >>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?. > >>>>>>>>>>>>>>>> Additionally, if no version is specified, the latest > >>> version > >>>>>>>>>>> should > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>> by default. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> Feng > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia < > >>>>>>>>>>> luoyu...@alumni.sjtu.edu.cn > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to > >>>>>>>>> introduce > >>>>>>>>>>>> time > >>>>>>>>>>>>>>> travel > >>>>>>>>>>>>>>>>> to Flink to have a better integration with external data > >>>>>>>>>>> soruces. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I also share same concern about the syntax. > >>>>>>>>>>>>>>>>> I see in the part of `Whether to support other syntax > >>>>>>>>>>>>>> implementations` > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR > >>>>>>>>>>> SYSTEM_TIME > >>>>>>>>>>>>> AS > >>>>>>>>>>>>>>> OF`, > >>>>>>>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>> But the the syntax part in this FLIP, it seems to be `AS > >>>>>>>> OF > >>>>>>>>>>>>>> TIMESTAMP` > >>>>>>>>>>>>>>>>> instead of `FOR SYSTEM_TIME AS OF`. Is it just a > >> mistake > >>>>>>>> or > >>>>>>>>>>> by > >>>>>>>>>>>>>> design? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>> Yuxia > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> ----- 原始邮件 ----- > >>>>>>>>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > >>>>>>>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org> > >>>>>>>>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 > >>>>>>>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch > >>>>>>>>> Mode > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks Feng, it's exciting to have this ability. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regarding the syntax section, are you proposing `AS OF` > >>>>>>>>>>> instead > >>>>>>>>>>>> of > >>>>>>>>>>>>>> `FOR > >>>>>>>>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is > >> in > >>>>>>>>> the > >>>>>>>>>>> SQL > >>>>>>>>>>>>>>>> standard > >>>>>>>>>>>>>>>>> and has been supported in some database vendors such as > >>>>>>>> SQL > >>>>>>>>>>>> Server. > >>>>>>>>>>>>>>> About > >>>>>>>>>>>>>>>>> `AS OF`, is it in the standard or any database vendor > >>>>>>>>> supports > >>>>>>>>>>>>> this, > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>> yes, I think it's worth to add this support to Calcite, > >>>>>>>> and > >>>>>>>>> I > >>>>>>>>>>>> would > >>>>>>>>>>>>>>> give > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better to > >>>>>>>> use > >>>>>>>>>>> `FOR > >>>>>>>>>>>>>> SYSTEM > >>>>>>>>>>>>>>>> AS > >>>>>>>>>>>>>>>>> OF`. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四 > >> 19:02写道: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Also: How do we want to query the most recent version > >>>>>>>> of a > >>>>>>>>>>>> table? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but according > >>>>>>>> to > >>>>>>>>>>> the > >>>>>>>>>>>>> docs > >>>>>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more > >>>>>>>> concerning > >>>>>>>>>>> is > >>>>>>>>>>>> the > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> actually is evalated row-based: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Returns the current SQL timestamp in the local time > >>>>>>>>> zone, > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each > >>>>>>>> record > >>>>>>>>> in > >>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the > >>>>>>>> query > >>>>>>>>>>>> starts > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> uses the same result for every row. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This could make it difficult to explain in a join > >>>>>>>> scenario > >>>>>>>>>>> of > >>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>> snapshotted tables. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote: > >>>>>>>>>>>>>>>>>>> Hi Feng, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of > >>>>>>>> sense > >>>>>>>>> to > >>>>>>>>>>>>>> finally > >>>>>>>>>>>>>>>>>>> support querying tables at a specific point in time or > >>>>>>>>>>>>> hopefully > >>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>> ranges soon. Following time-versioned tables. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here is some feedback from my side: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. Syntax > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for this > >>>>>>>>> but > >>>>>>>>>>>> not > >>>>>>>>>>>>>> `FOR > >>>>>>>>>>>>>>>>>>> SYSTEM_TIME AS OF`? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It would be great to support `AS OF` also for > >>>>>>>>>>> time-versioned > >>>>>>>>>>>>>> joins > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> have a unified and short syntax. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can make > >>>>>>>>> this > >>>>>>>>>>>>>>> available > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> Flink earlier by copying the corresponding classes > >>>>>>>> until > >>>>>>>>>>> the > >>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>>>> Calcite upgrade is performed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. Semantics > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have 2 > >>>>>>>>>>>> timestamp > >>>>>>>>>>>>>>> types > >>>>>>>>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF > >>>>>>>>>>>> TIMESTAMP > >>>>>>>>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the > >>>>>>>>>>> timestamp > >>>>>>>>>>>> be? > >>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> convert it to TIMESTAMP_LTZ? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We definely need to clarify this because the past has > >>>>>>>>>>> shown > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> daylight saving times make our lives hard. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote: > >>>>>>>>>>>>>>>>>>>> Hi, everyone. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308: > >>>>>>>> Support > >>>>>>>>>>> Time > >>>>>>>>>>>>>> Travel > >>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>>>>> Batch > >>>>>>>>>>>>>>>>>>>> Mode [1] > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Time travel is a SQL syntax used to query historical > >>>>>>>>>>>> versions > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> data. > >>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>> allows users to specify a point in time and retrieve > >>>>>>>>> the > >>>>>>>>>>>> data > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> schema of > >>>>>>>>>>>>>>>>>>>> a table as it appeared at that time. With time > >>>>>>>> travel, > >>>>>>>>>>> users > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> easily > >>>>>>>>>>>>>>>>>>>> analyze and compare historical versions of data. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> With the widespread use of data lake systems such as > >>>>>>>>>>> Paimon, > >>>>>>>>>>>>>>>> Iceberg, > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> Hudi, time travel can provide more convenience for > >>>>>>>>> users' > >>>>>>>>>>>> data > >>>>>>>>>>>>>>>>> analysis. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Looking forward to your opinions, any suggestions are > >>>>>>>>>>>>> welcomed. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Feng > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Benchao Li > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Benchao Li > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Benchao Li > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>> > >>> > >>> -- > >>> > >>> Best, > >>> Benchao Li > >>> > >> > > > > > > -- > > > > Best, > > Benchao Li > > -- Best, Benchao Li