Hi Yun,

The joined data is the versioned table in this case, I managed to get it as
far as fixing all of the static errors but the temporal join just doesn't
have a result... No idea what's going on.
In reality I don't think we even want a temporal join, we just want to add
a few extra columns to each row in the streaming table and not keep the
result in state.
I'll ask this question specifically to the user ML and see if anyone has an
idea.

On Fri, 18 Feb 2022 at 15:15, Yun Gao <yungao...@aliyun.com> wrote:

>
> Hi Francis,
>
> I think requiring primary for versioned table[1] used in temporarl join[2]
> should be
> expected. May I have a double confirmation that which table serves as the
> versioned
> table in this case? Is it the streaming table from the rabbitmq or the
> joined data?
>
> Best,
> Yun
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Francis Conroy <francis.con...@switchdin.com>
> *Send Date:*Thu Feb 17 11:27:01 2022
> *Recipients:*user <user@flink.apache.org>
> *Subject:*Flink 1.15 deduplication view and lookup join
>
>> Hi user group,
>>
>> I'm using flink 1.15 currently (we're waiting for it to be released) to
>> build up some streaming pipelines and I'm trying to do a temporal lookup
>> join.
>>
>> I've got several tables(all with primary keys) defined which are
>> populated by Debezium CDC data, let's call them a, b and c.
>>
>> I've defined a view which joins all three tables to give some
>> hierarchical association data rows like in the diagram.
>> [image: image.png]
>> This all works fine so far.
>> I'm trying to join this table with a table from a datastream, using a
>> lookup join (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join)
>> like follows:
>> [image: image.png]
>> I've added a time field to both tables now and I'm getting the following
>> validation exception:
>> *Temporal Table Join requires primary key in versioned table, but no
>> primary key can be found.*
>>
>>  I went and implemented another view on the joined data which
>> implemented the deduplication query (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/#deduplication)
>>
>>
>> Here is my view definition:
>>
>> CREATE VIEW versioned_endpoint_association AS
>> SELECT device_id,
>>        leg_dt_id,
>>        ldt_id,
>>        ep_uuid,
>>        unit_uuid,
>>        pf_uuid,
>>        update_time
>> FROM (
>>     SELECT *,
>>            ROW_NUMBER() OVER (PARTITION BY device_id
>>            ORDER BY update_time DESC) as rownum
>>       FROM endpoint_association)
>> WHERE rownum = 1;
>>
>> After taking all steps I cannot get the temporal join to work, am I
>> missing some detail which will tell flink that
>> versioned_endpoint_association should in-fact be interpreted as a versioned
>> table?
>>
>> Looking at the log it's important that there is a LogicalRank node which
>> can convert to a Deduplicate node, but the conversion isn't happening.
>>
>> [image: image.png]
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Reply via email to