Hey Leonard,

Agreed, this is a fun discussion!

(1) For support changelog source backed CDC tools, a problem is that can we
> use the temporal table as a general source table which may followed by some
> aggregation operations,  more accurate is wether the aggregation operator
> can use the DELETE record that we just updated the “correct” operation time
> to retract a record, maybe not. This will pull us back to the discussion of
> operation time VS event time, it’s a real cool but complicated topic see
> above discussion from mine and @Jark’s.

I fully agree this is a complicated topic, however, I don't think its
actually a problem that needs to be solved for the first version of this
feature. My proposal is to disallow using upsert streams as temporal tables
if an aggregation operation has been applied. Going back to my notion that
temporal tables are a tool for performing streaming star schema
denormalization, the dimension table in a star schema is rarely aggregated
pre-join. In the case of a CDC stream of currency rates joined to
transactions, the CDC stream only needs to support filter pushdowns and
map-like transformations before being joined. I believe this is a
reasonable limitation we can impose that will unblock a large percentage of
use cases, and once we better understand the semantics of the correct
operation in a retraction the limitation can be removed in future versions
while remaining backward compatible.

CREATE TABLE currency_rates (
  rate DECIMAL(10, 2)) WITH (
 'connector' = 'kafka',
 'format' = 'debezium-json'
*CREATE* TABLE transactions (
  currencyId BIGINT,
  transactionTime TIMESTAMP(3)) WITH (


-- Uner my proposal this query would be supported because the currency_rates

-- table is used in a temporal join without any aggregations having been applied

CREATE VIEW AS working_query
  transactions AS t
  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
  ON r.currency = t.currencyId

-- However, this query would be rejected by the planner until we
determine the proper time semantics of a retacation

CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate*
FROM *currency_rates

CREATE VIEW AS rejected_query
  transactions AS t
  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
  ON r.currency = t.currency

(2) For upsert source that defines PRIMARY KEY and may contains multiple
> records under a PK, the main problem is the  PK semantic,the multiple
> records under a PK broke the unique semantic on a table. We need to walk
> around this by (a) Adding another primary key keyword and explain the
> upsert semantic (b) Creating temporal table base on a view that is the
> deduplicated result of source table[2].

This feels like more of a bikeshedding question than a blocker and I look
forward to seeing what you come up with!


On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <libenc...@apache.org> wrote:

> Hi everyone,
> Thanks a lot for the great discussions so far.
> After reading through the long discussion, I still have one question.
> Currently the temporal table function supports both event time and proc
> time joining.
> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in DDL,
> does it mean we can only use temporal table function join with event time?
> If we can, how do we distinguish it with current temporal table (also
> known as dimension table)?
> Maybe I'm missing something here. Correct me if I'm wrong.
> Leonard Xu <xbjt...@gmail.com> 于2020年7月6日周一 下午11:34写道:
>> Hi, Seth
>> Thanks for your explanation of user cases, and you’re wright the look up
>> join/table is one kind of temporal table join/table which tracks latest
>> snapshot of external  DB-like tables, it's why we proposed use same
>> temporal join syntax.
>> In fact, I have invested and checked Debezuim format and Canal format
>> more these days, and we can obtain the extract DML operation time from
>> their meta information which comes from DB bin-log.  Although extracting
>> meta information from record is a part of FLIP-107 scope[1], at least we
>> have a way to extract the correct operation time. Event we can obtain the
>> expected operation time, there’re some problems.
>> (1) For support changelog source backed CDC tools, a problem is that can
>> we use the temporal table as a general source table which may followed by
>> some aggregation operations,  more accurate is wether the aggregation
>> operator can use the DELETE record that we just updated the “correct”
>> operation time to retract a record, maybe not. This will pull us back to
>> the discussion of operation time VS event time, it’s a real cool but
>> complicated topic see above discussion from mine and @Jark’s.
>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>> records under a PK, the main problem is the  PK semantic,the multiple
>> records under a PK broke the unique semantic on a table. We need to walk
>> around this by (a) Adding another primary key keyword and explain the
>> upsert semantic (b) Creating temporal table base on a view that is the
>> deduplicated result of source table[2].
>> I’m working on (2), and if we want to support(1)i.e. support DELETE
>> entirely, that’s really a big challenge but I also think wright thing for
>> long term.
>> If we decide to do (1), we need import operation time concept firstly, we
>> need change the codebase for deal the operation time header in many places
>> secondly, and finally explain and tell users how to understand and use
>> temporal table.
>> I’m a little worried about it’s valuable enough, I proposed only support
>> (2) because it is a good replacement of current Temporal Table Function and
>> will not introduce more concept and works.
>> Jark, Jingsong, Konstantin, WDYT?
>> Best,
>> Leonard Xu
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>> >
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> >
>> > 在 2020年7月6日,22:02,Seth Wiesman <sjwies...@gmail.com> 写道:
>> >
>> > As an aside, I conceptually view temporal table joins to be
>> semantically equivalent to look up table joins. They are just two different
>> ways of consuming the same data.
>> >
>> > Seth
>> >
>> > On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <sjwies...@gmail.com
>> <mailto:sjwies...@gmail.com>> wrote:
>> > Hi Leonard,
>> >
>> > Regarding DELETE operations I tend to have the opposite reaction. I
>> spend a lot of time working with production Flink users across a large
>> number of organizations and to say we don't support temporal tables that
>> include DELETEs will be a blocker for adoption. Even organizations that
>> claim to never delete rows still occasionally due so per  GDPR requests or
>> other regulations.
>> >
>> > I actually do think users will understand the limitations. Flink today
>> has a very clear value proposition around correctness, your results are as
>> correct as the input data provided. This does not change under support for
>> DELETE records. Flink is providing the most correct results possible based
>> on the resolution of the fields as generated by 3rd party systems. As
>> Debezium and other CDC libraries become more accurate, so will Flink.
>> >
>> > Seth
>> >
>> > On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <xbjt...@gmail.com <mailto:
>> xbjt...@gmail.com>> wrote:
>> > Hi, Konstantin
>> >
>> >> . Would we support a temporal join with a changelog stream with
>> >> event time semantics by ignoring DELETE messages or would it be
>> completed
>> >> unsupported.
>> >
>> > I don’t know the percentage of this feature in temporal scenarios.
>> >
>> > Comparing to support the approximate event time join by ignoring DELETE
>> message or by extracting an approximate event time for DELET message,  I’m
>> not sure is this acceptable for user even if we have explained the
>> limitation of approximate event time join, I tend to do not support this
>> feature, because we can not ensure the semantic of event time and it may
>> lead an incorrect result for user in some scenarios.
>> >
>> > If the percentage is highly enough and most user cases can accept the
>> approximate  event time, I'm ok to support it  for usability although it
>> doesn’t implements the event time semantic strictly.
>> >
>> > Cheers,
>> > Leonard Xu
>> >
>> >
> --
> Best,
> Benchao Li

Reply via email to