Hi Kelly,

As a simple workaround, You can remove the watermark definition in
`KafkaStream`, in this way, the stream-stream join will not complain
"Rowtime attributes" exception.

Best,
Jark

On Wed, 22 Jul 2020 at 03:13, Kelly Smith <kell...@zillowgroup.com> wrote:

> Thanks Leonard and Danny,
>
>
>
> This makes a lot of sense. My hope here is to only use SQL without any
> specialized Java/Scala code, so it seems it may not be possible to use
> either of these methods yet.
>
>
>
> I’ll open an issue for the LookupTableSource implementation, and look into
> the workaround you suggested in the short term.
>
>
>
> Thanks!
>
> Kelly
>
>
>
> *From: *Leonard Xu <xbjt...@gmail.com>
> *Date: *Monday, July 20, 2020 at 7:49 PM
> *To: *Danny Chan <yuzhao....@gmail.com>
> *Cc: *Kelly Smith <kell...@zillowgroup.com>, Flink ML <
> user@flink.apache.org>
> *Subject: *Re: Flink SQL - Join Lookup Table
>
>
>
> Hi, kelly
>
>
>
> Looks like you want to use fact table(from Kafka) to join a dimension
> table(From filesystem),  dimension table is one kind of Temporal Table,
> temporal table join syntax you could refer Danny's post[1].
>
>
>
> But `FileSystemTableSource` did not implement `LookupTableSource`
> interface yet which means you can not use it as a dimension table, the
> connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
>
> you can created an issue to support `lookupTableSource` for filesystem
> connector.
>
>
>
> Another approach is using Temporal Table Function[1] which can define a
> Temporal table from a dataStream, you can convert your Table(filesystem
> table) to stream and then create a temporal table and then join the
> temporal table.
>
>
>
>
>
> Best
>
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0>
>
>
>
>
>
> 在 2020年7月21日,10:07,Danny Chan <yuzhao....@gmail.com> 写道:
>
>
>
> Seems you want a temporal table join instead of a two stream join, if that
> is your request, you should use syntax
>
>
>
> Join LookupTable FOR SYSTEM_TIME AS OF …
>
>
>
> See [1] for details.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0>
>
>
>
> Best,
>
> Danny Chan
>
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道:
>
> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>    - Join a simple lookup table (a few rows) to a stream of data to
>    enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE* LookupTable (
>     *`computeClass` * STRING,
>     *`multiplier`   *
> *FLOAT *) *WITH* (
>     *'connector'* = *'filesystem'*,
>     *'path'* = *'fpu-multipliers.csv'*,
>     *'format'* =
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT    *`timestamp`,
>
>     // ...
>     ks.computeClass,
>     lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>  Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
> Any ideas on how to accomplish what I’m trying to do?
>
>
>
> Thanks!
>
> Kelly
>
>
>

Reply via email to