Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jark Wu
Hi Kelly,

As a simple workaround, You can remove the watermark definition in
`KafkaStream`, in this way, the stream-stream join will not complain
"Rowtime attributes" exception.

Best,
Jark

On Wed, 22 Jul 2020 at 03:13, Kelly Smith  wrote:

> Thanks Leonard and Danny,
>
>
>
> This makes a lot of sense. My hope here is to only use SQL without any
> specialized Java/Scala code, so it seems it may not be possible to use
> either of these methods yet.
>
>
>
> I’ll open an issue for the LookupTableSource implementation, and look into
> the workaround you suggested in the short term.
>
>
>
> Thanks!
>
> Kelly
>
>
>
> *From: *Leonard Xu 
> *Date: *Monday, July 20, 2020 at 7:49 PM
> *To: *Danny Chan 
> *Cc: *Kelly Smith , Flink ML <
> user@flink.apache.org>
> *Subject: *Re: Flink SQL - Join Lookup Table
>
>
>
> Hi, kelly
>
>
>
> Looks like you want to use fact table(from Kafka) to join a dimension
> table(From filesystem),  dimension table is one kind of Temporal Table,
> temporal table join syntax you could refer Danny's post[1].
>
>
>
> But `FileSystemTableSource` did not implement `LookupTableSource`
> interface yet which means you can not use it as a dimension table, the
> connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
>
> you can created an issue to support `lookupTableSource` for filesystem
> connector.
>
>
>
> Another approach is using Temporal Table Function[1] which can define a
> Temporal table from a dataStream, you can convert your Table(filesystem
> table) to stream and then create a temporal table and then join the
> temporal table.
>
>
>
>
>
> Best
>
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0>
>
>
>
>
>
> 在 2020年7月21日,10:07,Danny Chan  写道:
>
>
>
> Seems you want a temporal table join instead of a two stream join, if that
> is your request, you should use syntax
>
>
>
> Join LookupTable FOR SYSTEM_TIME AS OF …
>
>
>
> See [1] for details.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0>
>
>
>
> Best,
>
> Danny Chan
>
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>
> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE* LookupTable (
> *`computeClass` * STRING,
> *`multiplier`   *
> *FLOAT *) *WITH* (
> *'connector'* = *'filesystem'*,
> *'path'* = *'fpu-multipliers.csv'*,
> *'format'* =
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.com

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jingsong Li
Hi Kelly,

There are issues for tracking:
- Filesystem support single file reading:
https://issues.apache.org/jira/browse/FLINK-17398
- Filesystem support LookupJoin:
https://issues.apache.org/jira/browse/FLINK-17397

Best,
Jingsong

On Wed, Jul 22, 2020 at 3:13 AM Kelly Smith  wrote:

> Thanks Leonard and Danny,
>
>
>
> This makes a lot of sense. My hope here is to only use SQL without any
> specialized Java/Scala code, so it seems it may not be possible to use
> either of these methods yet.
>
>
>
> I’ll open an issue for the LookupTableSource implementation, and look into
> the workaround you suggested in the short term.
>
>
>
> Thanks!
>
> Kelly
>
>
>
> *From: *Leonard Xu 
> *Date: *Monday, July 20, 2020 at 7:49 PM
> *To: *Danny Chan 
> *Cc: *Kelly Smith , Flink ML <
> user@flink.apache.org>
> *Subject: *Re: Flink SQL - Join Lookup Table
>
>
>
> Hi, kelly
>
>
>
> Looks like you want to use fact table(from Kafka) to join a dimension
> table(From filesystem),  dimension table is one kind of Temporal Table,
> temporal table join syntax you could refer Danny's post[1].
>
>
>
> But `FileSystemTableSource` did not implement `LookupTableSource`
> interface yet which means you can not use it as a dimension table, the
> connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
>
> you can created an issue to support `lookupTableSource` for filesystem
> connector.
>
>
>
> Another approach is using Temporal Table Function[1] which can define a
> Temporal table from a dataStream, you can convert your Table(filesystem
> table) to stream and then create a temporal table and then join the
> temporal table.
>
>
>
>
>
> Best
>
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0>
>
>
>
>
>
> 在 2020年7月21日,10:07,Danny Chan  写道:
>
>
>
> Seems you want a temporal table join instead of a two stream join, if that
> is your request, you should use syntax
>
>
>
> Join LookupTable FOR SYSTEM_TIME AS OF …
>
>
>
> See [1] for details.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0>
>
>
>
> Best,
>
> Danny Chan
>
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>
> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE* LookupTable (
> *`computeClass` * STRING,
> *`multiplier`   *
> *FLOAT *) *WITH* (
> *'connector'* = *'filesystem'*,
> *'path'* = *'fpu-multipliers.csv'*,
> *'format'* =
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt O

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Kelly Smith
Thanks Leonard and Danny,

This makes a lot of sense. My hope here is to only use SQL without any 
specialized Java/Scala code, so it seems it may not be possible to use either 
of these methods yet.

I’ll open an issue for the LookupTableSource implementation, and look into the 
workaround you suggested in the short term.

Thanks!
Kelly

From: Leonard Xu 
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan 
Cc: Kelly Smith , Flink ML 
Subject: Re: Flink SQL - Join Lookup Table

Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0>



在 2020年7月21日,10:07,Danny Chan 
mailto:yuzhao@gmail.com>> 写道:

Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0>

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith 
mailto:kell...@zillowgroup.com>>,写道:

Hi folks,

I have a question Flink SQL. What I want to do is this:


  *   Join a simple lookup table (a few rows) to a stream of data to enrich the 
stream by adding a column from the lookup table.


For example, a simple lookup table:

CREATE TABLE LookupTable (
`computeClass`  STRING,
`multiplier`FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'fpu-multipliers.csv',
'format' = 'csv'
)


And I’ve got a Kafka connector table with rowtime semantics that has a 
`computeClass` field. I simply want to join (in a streaming fashion) the 
`multiplier` field above.


SELECT
`timestamp`,

// ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks

JOIN LookupTable lt ON ks.computeClass = lt.computeClass


Doing a simple join like that gives me this error:

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.”

Which leads me to believe that I should use an Interval Join instead, but that 
doesn’t seem to be appropriate since my table is static and has no concept of 
time. Basically, I want to hold the entire lookup table in memory, and simply 
enrich the Kafka stream (which need not be held in memory).

Any ideas on how to accomplish what I’m trying to do?

Thanks!
Kelly



Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Leonard Xu
Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
 



> 在 2020年7月21日,10:07,Danny Chan  写道:
> 
> Seems you want a temporal table join instead of a two stream join, if that is 
> your request, you should use syntax
> 
> Join LookupTable FOR SYSTEM_TIME AS OF …
> 
> See [1] for details.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>  
> 
> 
> Best,
> Danny Chan
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>> Hi folks,
>> 
>>  
>> I have a question Flink SQL. What I want to do is this:
>> 
>>  
>> Join a simple lookup table (a few rows) to a stream of data to enrich the 
>> stream by adding a column from the lookup table.
>>  
>>  
>> For example, a simple lookup table:
>> 
>>  
>> CREATE TABLE LookupTable (
>> `computeClass`  STRING,
>> `multiplier`FLOAT
>> ) WITH (
>> 'connector' = 'filesystem',
>> 'path' = 'fpu-multipliers.csv',
>> 'format' = 'csv'
>> )
>> 
>>  
>>  
>> And I’ve got a Kafka connector table with rowtime semantics that has a 
>> `computeClass` field. I simply want to join (in a streaming fashion) the 
>> `multiplier` field above.
>> 
>>  
>> SELECT
>> `timestamp`,
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> FROM KafkaStream ks
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>  
>> Doing a simple join like that gives me this error:
>> 
>>  
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not be 
>> in the input rows of a regular join. As a workaround you can cast the time 
>> attributes of input tables to TIMESTAMP before.”
>> 
>>  
>> Which leads me to believe that I should use an Interval Join instead, but 
>> that doesn’t seem to be appropriate since my table is static and has no 
>> concept of time. Basically, I want to hold the entire lookup table in 
>> memory, and simply enrich the Kafka stream (which need not be held in 
>> memory).
>> 
>>  
>> Any ideas on how to accomplish what I’m trying to do?
>> 
>>  
>> Thanks!
>> 
>> Kelly
>> 



Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Danny Chan
Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
> Hi folks,
>
> I have a question Flink SQL. What I want to do is this:
>
>
> • Join a simple lookup table (a few rows) to a stream of data to enrich the 
> stream by adding a column from the lookup table.
>
>
>
> For example, a simple lookup table:
>
> CREATE TABLE LookupTable (
>     `computeClass`  STRING,
>     `multiplier`    FLOAT
> ) WITH (
>     'connector' = 'filesystem',
>     'path' = 'fpu-multipliers.csv',
>     'format' = 'csv'
> )
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a 
> `computeClass` field. I simply want to join (in a streaming fashion) the 
> `multiplier` field above.
>
> SELECT
>`timestamp`,
>// ...
>ks.computeClass,
>lt.`multiplier`
> FROM KafkaStream ks
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
> Doing a simple join like that gives me this error:
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
> the input rows of a regular join. As a workaround you can cast the time 
> attributes of input tables to TIMESTAMP before.”
>
> Which leads me to believe that I should use an Interval Join instead, but 
> that doesn’t seem to be appropriate since my table is static and has no 
> concept of time. Basically, I want to hold the entire lookup table in memory, 
> and simply enrich the Kafka stream (which need not be held in memory).
>
> Any ideas on how to accomplish what I’m trying to do?
>
> Thanks!
> Kelly


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651

godfrey he  于2020年7月21日周二 上午9:46写道:

> hi  Kelly,
> As the exception message mentioned: currently, we must cast the time
> attribute to regular TIMESTAMP type,
> then we can do regular join. Because time attribute will be out-of-order
> after regular join,
> and then we can't do window aggregate based on the time attribute.
>
> We can improve it that the planner implicitly casts the time attribute to
> regular TIMESTAMP type,
> and throws exception there is an operator (after join) depended on time
> attribute, like window aggregate.
>
> I will create a JIRA to trace this.
>
> Best,
> Godfrey
>
> Kelly Smith  于2020年7月21日周二 上午6:38写道:
>
>> Hi folks,
>>
>>
>>
>> I have a question Flink SQL. What I want to do is this:
>>
>>
>>
>>- Join a simple lookup table (a few rows) to a stream of data to
>>enrich the stream by adding a column from the lookup table.
>>
>>
>>
>>
>>
>> For example, a simple lookup table:
>>
>>
>>
>> *CREATE TABLE *LookupTable (
>> *`computeClass`  *STRING,
>> *`multiplier`*
>> *FLOAT *) *WITH *(
>> *'connector' *= *'filesystem'*,
>> *'path' *= *'fpu-multipliers.csv'*,
>> *'format' *=
>> *'csv' *)
>>
>>
>>
>>
>>
>> And I’ve got a Kafka connector table with rowtime semantics that has a
>> `computeClass` field. I simply want to join (in a streaming fashion) the
>> `multiplier` field above.
>>
>>
>>
>>
>> *SELECT*`timestamp`,
>>
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> *FROM *KafkaStream ks
>>
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>
>>
>>
>> Doing a simple join like that gives me this error:
>>
>>
>>
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not
>> be in the input rows of a regular join. As a workaround you can cast the
>> time attributes of input tables to TIMESTAMP before.”
>>
>>
>>
>> Which leads me to believe that I should use an Interval Join instead, but
>> that doesn’t seem to be appropriate since my table is static and has no
>> concept of time. Basically, I want to hold the entire lookup table in
>> memory, and simply enrich the Kafka stream (which need not be held in
>> memory).
>>
>>
>>
>> Any ideas on how to accomplish what I’m trying to do?
>>
>>
>>
>> Thanks!
>>
>> Kelly
>>
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi  Kelly,
As the exception message mentioned: currently, we must cast the time
attribute to regular TIMESTAMP type,
then we can do regular join. Because time attribute will be out-of-order
after regular join,
and then we can't do window aggregate based on the time attribute.

We can improve it that the planner implicitly casts the time attribute to
regular TIMESTAMP type,
and throws exception there is an operator (after join) depended on time
attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith  于2020年7月21日周二 上午6:38写道:

> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE *LookupTable (
> *`computeClass`  *STRING,
> *`multiplier`*
> *FLOAT *) *WITH *(
> *'connector' *= *'filesystem'*,
> *'path' *= *'fpu-multipliers.csv'*,
> *'format' *=
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>
>
> Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
> Any ideas on how to accomplish what I’m trying to do?
>
>
>
> Thanks!
>
> Kelly
>


Flink SQL - Join Lookup Table

2020-07-20 Thread Kelly Smith
Hi folks,

I have a question Flink SQL. What I want to do is this:


  *   Join a simple lookup table (a few rows) to a stream of data to enrich the 
stream by adding a column from the lookup table.


For example, a simple lookup table:

CREATE TABLE LookupTable (
`computeClass`  STRING,
`multiplier`FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'fpu-multipliers.csv',
'format' = 'csv'
)


And I’ve got a Kafka connector table with rowtime semantics that has a 
`computeClass` field. I simply want to join (in a streaming fashion) the 
`multiplier` field above.


SELECT
`timestamp`,

// ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks

JOIN LookupTable lt ON ks.computeClass = lt.computeClass


Doing a simple join like that gives me this error:

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.”

Which leads me to believe that I should use an Interval Join instead, but that 
doesn’t seem to be appropriate since my table is static and has no concept of 
time. Basically, I want to hold the entire lookup table in memory, and simply 
enrich the Kafka stream (which need not be held in memory).

Any ideas on how to accomplish what I’m trying to do?

Thanks!
Kelly