Hi Kelly, There are issues for tracking: - Filesystem support single file reading: https://issues.apache.org/jira/browse/FLINK-17398 - Filesystem support LookupJoin: https://issues.apache.org/jira/browse/FLINK-17397
Best, Jingsong On Wed, Jul 22, 2020 at 3:13 AM Kelly Smith <kell...@zillowgroup.com> wrote: > Thanks Leonard and Danny, > > > > This makes a lot of sense. My hope here is to only use SQL without any > specialized Java/Scala code, so it seems it may not be possible to use > either of these methods yet. > > > > I’ll open an issue for the LookupTableSource implementation, and look into > the workaround you suggested in the short term. > > > > Thanks! > > Kelly > > > > *From: *Leonard Xu <xbjt...@gmail.com> > *Date: *Monday, July 20, 2020 at 7:49 PM > *To: *Danny Chan <yuzhao....@gmail.com> > *Cc: *Kelly Smith <kell...@zillowgroup.com>, Flink ML < > user@flink.apache.org> > *Subject: *Re: Flink SQL - Join Lookup Table > > > > Hi, kelly > > > > Looks like you want to use fact table(from Kafka) to join a dimension > table(From filesystem), dimension table is one kind of Temporal Table, > temporal table join syntax you could refer Danny's post[1]. > > > > But `FileSystemTableSource` did not implement `LookupTableSource` > interface yet which means you can not use it as a dimension table, the > connector that supported `LookupTableSource` includes JDBC、HBase、Hive, > > you can created an issue to support `lookupTableSource` for filesystem > connector. > > > > Another approach is using Temporal Table Function[1] which can define a > Temporal table from a dataStream, you can convert your Table(filesystem > table) to stream and then create a temporal table and then join the > temporal table. > > > > > > Best > > Leonard Xu > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0> > > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0> > > > > > > 在 2020年7月21日,10:07,Danny Chan <yuzhao....@gmail.com> 写道: > > > > Seems you want a temporal table join instead of a two stream join, if that > is your request, you should use syntax > > > > Join LookupTable FOR SYSTEM_TIME AS OF … > > > > See [1] for details. > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0> > > > > Best, > > Danny Chan > > 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道: > > Hi folks, > > > > I have a question Flink SQL. What I want to do is this: > > > > - Join a simple lookup table (a few rows) to a stream of data to > enrich the stream by adding a column from the lookup table. > > > > > > For example, a simple lookup table: > > > > *CREATE TABLE* LookupTable ( > *`computeClass` * STRING, > *`multiplier` * > *FLOAT *) *WITH* ( > *'connector'* = *'filesystem'*, > *'path'* = *'fpu-multipliers.csv'*, > *'format'* = > *'csv' *) > > > > > > And I’ve got a Kafka connector table with rowtime semantics that has a > `computeClass` field. I simply want to join (in a streaming fashion) the > `multiplier` field above. > > > > > *SELECT *`timestamp`, > > // ... > ks.computeClass, > lt.`multiplier` > *FROM *KafkaStream ks > > JOIN LookupTable lt ON ks.computeClass = lt.computeClass > > Doing a simple join like that gives me this error: > > > > “org.apache.flink.table.api.TableException: Rowtime attributes must not be > in the input rows of a regular join. As a workaround you can cast the time > attributes of input tables to TIMESTAMP before.” > > > > Which leads me to believe that I should use an Interval Join instead, but > that doesn’t seem to be appropriate since my table is static and has no > concept of time. Basically, I want to hold the entire lookup table in > memory, and simply enrich the Kafka stream (which need not be held in > memory). > > > > Any ideas on how to accomplish what I’m trying to do? > > > > Thanks! > > Kelly > > > -- Best, Jingsong Lee