Thanks Leonard and Danny,

This makes a lot of sense. My hope here is to only use SQL without any 
specialized Java/Scala code, so it seems it may not be possible to use either 
of these methods yet.

I’ll open an issue for the LookupTableSource implementation, and look into the 
workaround you suggested in the short term.

Thanks!
Kelly

From: Leonard Xu <xbjt...@gmail.com>
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan <yuzhao....@gmail.com>
Cc: Kelly Smith <kell...@zillowgroup.com>, Flink ML <user@flink.apache.org>
Subject: Re: Flink SQL - Join Lookup Table

Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0>



在 2020年7月21日,10:07,Danny Chan 
<yuzhao....@gmail.com<mailto:yuzhao....@gmail.com>> 写道:

Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0>

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith 
<kell...@zillowgroup.com<mailto:kell...@zillowgroup.com>>,写道:

Hi folks,

I have a question Flink SQL. What I want to do is this:


  *   Join a simple lookup table (a few rows) to a stream of data to enrich the 
stream by adding a column from the lookup table.


For example, a simple lookup table:

CREATE TABLE LookupTable (
    `computeClass`  STRING,
    `multiplier`    FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'fpu-multipliers.csv',
    'format' = 'csv'
)


And I’ve got a Kafka connector table with rowtime semantics that has a 
`computeClass` field. I simply want to join (in a streaming fashion) the 
`multiplier` field above.


SELECT
    `timestamp`,

    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks

JOIN LookupTable lt ON ks.computeClass = lt.computeClass


Doing a simple join like that gives me this error:

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.”

Which leads me to believe that I should use an Interval Join instead, but that 
doesn’t seem to be appropriate since my table is static and has no concept of 
time. Basically, I want to hold the entire lookup table in memory, and simply 
enrich the Kafka stream (which need not be held in memory).

Any ideas on how to accomplish what I’m trying to do?

Thanks!
Kelly

Reply via email to