* I mistyped the rejected_query, it should be

CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate*
FROM *currency_rates

CREATE VIEW AS rejected_query
SELECT
  ...FROM
  transactions AS t
  JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r
  ON r.currency = t.currency


On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <sjwies...@gmail.com> wrote:

> Hey Leonard,
>
> Agreed, this is a fun discussion!
>
> (1) For support changelog source backed CDC tools, a problem is that can
>> we use the temporal table as a general source table which may followed by
>> some aggregation operations,  more accurate is wether the aggregation
>> operator can use the DELETE record that we just updated the “correct”
>> operation time to retract a record, maybe not. This will pull us back to
>> the discussion of operation time VS event time, it’s a real cool but
>> complicated topic see above discussion from mine and @Jark’s.
>>
>
> I fully agree this is a complicated topic, however, I don't think its
> actually a problem that needs to be solved for the first version of this
> feature. My proposal is to disallow using upsert streams as temporal tables
> if an aggregation operation has been applied. Going back to my notion that
> temporal tables are a tool for performing streaming star schema
> denormalization, the dimension table in a star schema is rarely aggregated
> pre-join. In the case of a CDC stream of currency rates joined to
> transactions, the CDC stream only needs to support filter pushdowns and
> map-like transformations before being joined. I believe this is a
> reasonable limitation we can impose that will unblock a large percentage of
> use cases, and once we better understand the semantics of the correct
> operation in a retraction the limitation can be removed in future versions
> while remaining backward compatible.
>
>
>
> CREATE TABLE currency_rates (
>   currencyId BIGINT PRIMARY KEY,
>   rate DECIMAL(10, 2)) WITH (
>  'connector' = 'kafka',
>  'format' = 'debezium-json'
> )
> *CREATE* TABLE transactions (
>   currencyId BIGINT,
>   transactionTime TIMESTAMP(3)) WITH (
>
> )
>
> -- Uner my proposal this query would be supported because the currency_rates
>
> -- table is used in a temporal join without any aggregations having been 
> applied
>
> CREATE VIEW AS working_query
> SELECT
>   ...FROM
>   transactions AS t
>   JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>   ON r.currency = t.currencyId
>
> -- However, this query would be rejected by the planner until we determine 
> the proper time semantics of a retacation
>
> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM 
> *currency_rates
>
> CREATE VIEW AS rejected_query
> SELECT
>   ...FROM
>   transactions AS t
>   JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>   ON r.currency = t.currency
>
>
>
> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>> records under a PK, the main problem is the  PK semantic,the multiple
>> records under a PK broke the unique semantic on a table. We need to walk
>> around this by (a) Adding another primary key keyword and explain the
>> upsert semantic (b) Creating temporal table base on a view that is the
>> deduplicated result of source table[2].
>>
>
> This feels like more of a bikeshedding question than a blocker and I look
> forward to seeing what you come up with!
>
> Seth
>
> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <libenc...@apache.org> wrote:
>
>> Hi everyone,
>>
>> Thanks a lot for the great discussions so far.
>>
>> After reading through the long discussion, I still have one question.
>> Currently the temporal table function supports both event time and proc
>> time joining.
>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in
>> DDL,
>> does it mean we can only use temporal table function join with event time?
>> If we can, how do we distinguish it with current temporal table (also
>> known as dimension table)?
>>
>> Maybe I'm missing something here. Correct me if I'm wrong.
>>
>> Leonard Xu <xbjt...@gmail.com> 于2020年7月6日周一 下午11:34写道:
>>
>>> Hi, Seth
>>>
>>> Thanks for your explanation of user cases, and you’re wright the look up
>>> join/table is one kind of temporal table join/table which tracks latest
>>> snapshot of external  DB-like tables, it's why we proposed use same
>>> temporal join syntax.
>>>
>>> In fact, I have invested and checked Debezuim format and Canal format
>>> more these days, and we can obtain the extract DML operation time from
>>> their meta information which comes from DB bin-log.  Although extracting
>>> meta information from record is a part of FLIP-107 scope[1], at least we
>>> have a way to extract the correct operation time. Event we can obtain the
>>> expected operation time, there’re some problems.
>>>
>>> (1) For support changelog source backed CDC tools, a problem is that can
>>> we use the temporal table as a general source table which may followed by
>>> some aggregation operations,  more accurate is wether the aggregation
>>> operator can use the DELETE record that we just updated the “correct”
>>> operation time to retract a record, maybe not. This will pull us back to
>>> the discussion of operation time VS event time, it’s a real cool but
>>> complicated topic see above discussion from mine and @Jark’s.
>>>
>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>> records under a PK, the main problem is the  PK semantic,the multiple
>>> records under a PK broke the unique semantic on a table. We need to walk
>>> around this by (a) Adding another primary key keyword and explain the
>>> upsert semantic (b) Creating temporal table base on a view that is the
>>> deduplicated result of source table[2].
>>>
>>> I’m working on (2), and if we want to support(1)i.e. support DELETE
>>> entirely, that’s really a big challenge but I also think wright thing for
>>> long term.
>>>
>>> If we decide to do (1), we need import operation time concept firstly,
>>> we need change the codebase for deal the operation time header in many
>>> places secondly, and finally explain and tell users how to understand and
>>> use temporal table.
>>>
>>> I’m a little worried about it’s valuable enough, I proposed only support
>>> (2) because it is a good replacement of current Temporal Table Function and
>>> will not introduce more concept and works.
>>>
>>> Jark, Jingsong, Konstantin, WDYT?
>>>
>>>
>>> Best,
>>> Leonard Xu
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>> <
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>> >
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> >
>>>
>>>
>>> > 在 2020年7月6日,22:02,Seth Wiesman <sjwies...@gmail.com> 写道:
>>> >
>>> > As an aside, I conceptually view temporal table joins to be
>>> semantically equivalent to look up table joins. They are just two different
>>> ways of consuming the same data.
>>> >
>>> > Seth
>>> >
>>> > On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <sjwies...@gmail.com
>>> <mailto:sjwies...@gmail.com>> wrote:
>>> > Hi Leonard,
>>> >
>>> > Regarding DELETE operations I tend to have the opposite reaction. I
>>> spend a lot of time working with production Flink users across a large
>>> number of organizations and to say we don't support temporal tables that
>>> include DELETEs will be a blocker for adoption. Even organizations that
>>> claim to never delete rows still occasionally due so per  GDPR requests or
>>> other regulations.
>>> >
>>> > I actually do think users will understand the limitations. Flink today
>>> has a very clear value proposition around correctness, your results are as
>>> correct as the input data provided. This does not change under support for
>>> DELETE records. Flink is providing the most correct results possible based
>>> on the resolution of the fields as generated by 3rd party systems. As
>>> Debezium and other CDC libraries become more accurate, so will Flink.
>>> >
>>> > Seth
>>> >
>>> > On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <xbjt...@gmail.com <mailto:
>>> xbjt...@gmail.com>> wrote:
>>> > Hi, Konstantin
>>> >
>>> >> . Would we support a temporal join with a changelog stream with
>>> >> event time semantics by ignoring DELETE messages or would it be
>>> completed
>>> >> unsupported.
>>> >
>>> > I don’t know the percentage of this feature in temporal scenarios.
>>> >
>>> > Comparing to support the approximate event time join by ignoring
>>> DELETE message or by extracting an approximate event time for DELET
>>> message,  I’m not sure is this acceptable for user even if we have
>>> explained the limitation of approximate event time join, I tend to do not
>>> support this feature, because we can not ensure the semantic of event time
>>> and it may lead an incorrect result for user in some scenarios.
>>> >
>>> > If the percentage is highly enough and most user cases can accept the
>>> approximate  event time, I'm ok to support it  for usability although it
>>> doesn’t implements the event time semantic strictly.
>>> >
>>> > Cheers,
>>> > Leonard Xu
>>> >
>>> >
>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

Reply via email to