Re: LookupableTableSource question
> how do I enable Blink planner support? After flink-1.9 release, you can try Blink-planner. >Since when is LATERAL TABLE available in Flink? Is it equivalent to using >temporal tables? LATERAL TABLE is table function in table, it is available in Flink for a long time.[1] It is different from temporal table. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/udfs.html#table-functions Best, JingsongLee -- From:Flavio Pompermaier Send Time:2019年7月1日(星期一) 21:26 To:JingsongLee Cc:user Subject:Re: LookupableTableSource question I probably messed up with the meaning of eval()..thus it is called once for every distinct key (that could be composed by a combination of fields)? So, the other question is..how do I enable Blink planner support? Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables [1]? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html Best, Flavio On Sat, Jun 29, 2019 at 3:16 AM JingsongLee wrote: The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key? Best, Jingsong Lee 来自阿里邮箱 iPhone版 --Original Mail -- From:Flavio Pompermaier Date:2019-06-28 22:53:31 Recipient:JingsongLee CC:user Subject:Re: LookupableTableSource question Sorry I copied and pasted twice the current eval method...I'd do this: public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier wrote: This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service. The only strange thing I see is that the key of the local cache is per block of keys..am I wrong? Shouldn't it cycle over the list of passed keys? Right now it's the following: Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... while I'd use the following (also for JDBC): Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... Am I missing something? On Fri, Jun 28, 2019 at 4:18 PM JingsongLee wrote: Hi Flavio: I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use blink temporal table join[3] (Need blink planner support). I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). Is that you want? [1] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java [2] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 [3] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 Best, JingsongLee -- From:Flavio Pompermaier Send Time:2019年6月28日(星期五) 21:04 To:user Subject:LookupableTableSource question Hi to all, I have a use case where I'd like to enrich a stream using a rarely updated lookup table. Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed. Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirement
Re: LookupableTableSource question
I probably messed up with the meaning of eval()..thus it is called once for every distinct key (that could be composed by a combination of fields)? So, the other question is..how do I enable Blink planner support? Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables [1]? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html Best, Flavio On Sat, Jun 29, 2019 at 3:16 AM JingsongLee wrote: > The keys means joint primary keys, it is not list of keys, in your case, > maybe there is a single key? > > Best, Jingsong Lee > > > 来自阿里邮箱 iPhone版 > <https://itunes.apple.com/us/app/a-li-yun-you/id923828102?l=zh&ls=1&mt=8> > > --Original Mail -- > *From:*Flavio Pompermaier > *Date:*2019-06-28 22:53:31 > *Recipient:*JingsongLee > *CC:*user > *Subject:*Re: LookupableTableSource question > Sorry I copied and pasted twice the current eval method...I'd do this: > > public void eval(Object... keys) { > for (Object kkk : keys) { > Row keyRow = Row.of(kkk); > if (cache != null) { > List cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > } > ... > > On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier > wrote: > >> This could be a good fit, I'll try to dig into it and see if it can be >> adapted to a REST service. >> The only strange thing I see is that the key of the local cache is per >> block of keys..am I wrong? >> Shouldn't it cycle over the list of passed keys? >> >> Right now it's the following: >> >> Cache> cache; >> >> public void eval(Object... keys) { >> Row keyRow = Row.of(keys); >> if (cache != null) { >> List cachedRows = cache.getIfPresent(keyRow); >> if (cachedRows != null) { >> for (Row cachedRow : cachedRows) { >> collect(cachedRow); >> } >> return; >> } >> } >> ... >> >> while I'd use the following (also for JDBC): >> >> Cache> cache; >> >> public void eval(Object... keys) { >> Row keyRow = Row.of(keys); >> if (cache != null) { >> List cachedRows = cache.getIfPresent(keyRow); >> if (cachedRows != null) { >> for (Row cachedRow : cachedRows) { >> collect(cachedRow); >> } >> return; >> } >> } >> ... >> >> public void eval(Object... keys) { >> for (Object kkk : keys) { >> Row keyRow = Row.of(kkk); >> if (cache != null) { >> List cachedRows = cache.getIfPresent(keyRow); >> if (cachedRows != null) { >> for (Row cachedRow : cachedRows) { >> collect(cachedRow); >> } >> return; >> } >> } >> } >> ... >> >> Am I missing something? >> >> >> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee >> wrote: >> >>> Hi Flavio: >>> >>> I just implement a JDBCLookupFunction[1]. You can use it as table >>> function[2]. Or use >>> blink temporal table join[3] (Need blink planner support). >>> I add a google guava cache in JDBCLookupFunction with configurable >>> cacheMaxSize >>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). >>> Is that you want? >>> >>> [1] >>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java >>> [2] >>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 >>> [3] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 >>> >>> Best, JingsongLee >>> >>> -- >>> From:Flavio Pompermaier >>> Send Time:2019年6月28日(星期五) 21:04 >>> To:user >>> Subject:LookupableTableSource question >>> >>> Hi to all, >>> I have a use case where I'd like to enrich a stream using a rarely >>> updated lookup table. >>> Basically, I'd like to be able to set a refresh policy that is triggered >>> either when a key was not found (a new key has probably been added in the >>> mean time) or a configurable refresh-period has elapsed. >>> >>> Is there any suggested solution to this? The LookupableTableSource looks >>> very similar to what I'd like to achieve but I can't find a real-world >>> example using it and it lacks of such 2 requirements (key-values are not >>> refreshed after a configurable timeout and a KeyNotFound callback cannot be >>> handled). >>> >>> Any help is appreciated, >>> Flavio >>> >>> >>> >> >
Re: LookupableTableSource question
The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key? Best, Jingsong Lee 来自阿里邮箱 iPhone版 --Original Mail -- From:Flavio Pompermaier Date:2019-06-28 22:53:31 Recipient:JingsongLee CC:user Subject:Re: LookupableTableSource question Sorry I copied and pasted twice the current eval method...I'd do this: public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier wrote: This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service. The only strange thing I see is that the key of the local cache is per block of keys..am I wrong? Shouldn't it cycle over the list of passed keys? Right now it's the following: Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... while I'd use the following (also for JDBC): Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... Am I missing something? On Fri, Jun 28, 2019 at 4:18 PM JingsongLee wrote: Hi Flavio: I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use blink temporal table join[3] (Need blink planner support). I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). Is that you want? [1] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java [2] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 [3] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 Best, JingsongLee -- From:Flavio Pompermaier Send Time:2019年6月28日(星期五) 21:04 To:user Subject:LookupableTableSource question Hi to all, I have a use case where I'd like to enrich a stream using a rarely updated lookup table. Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed. Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled). Any help is appreciated, Flavio
Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this: public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier wrote: > This could be a good fit, I'll try to dig into it and see if it can be > adapted to a REST service. > The only strange thing I see is that the key of the local cache is per > block of keys..am I wrong? > Shouldn't it cycle over the list of passed keys? > > Right now it's the following: > > Cache> cache; > > public void eval(Object... keys) { > Row keyRow = Row.of(keys); > if (cache != null) { > List cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > ... > > while I'd use the following (also for JDBC): > > Cache> cache; > > public void eval(Object... keys) { > Row keyRow = Row.of(keys); > if (cache != null) { > List cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > ... > > public void eval(Object... keys) { > for (Object kkk : keys) { > Row keyRow = Row.of(kkk); > if (cache != null) { > List cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > } > ... > > Am I missing something? > > > On Fri, Jun 28, 2019 at 4:18 PM JingsongLee > wrote: > >> Hi Flavio: >> >> I just implement a JDBCLookupFunction[1]. You can use it as table >> function[2]. Or use >> blink temporal table join[3] (Need blink planner support). >> I add a google guava cache in JDBCLookupFunction with configurable >> cacheMaxSize >> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). >> Is that you want? >> >> [1] >> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java >> [2] >> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 >> [3] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 >> >> Best, JingsongLee >> >> -- >> From:Flavio Pompermaier >> Send Time:2019年6月28日(星期五) 21:04 >> To:user >> Subject:LookupableTableSource question >> >> Hi to all, >> I have a use case where I'd like to enrich a stream using a rarely >> updated lookup table. >> Basically, I'd like to be able to set a refresh policy that is triggered >> either when a key was not found (a new key has probably been added in the >> mean time) or a configurable refresh-period has elapsed. >> >> Is there any suggested solution to this? The LookupableTableSource looks >> very similar to what I'd like to achieve but I can't find a real-world >> example using it and it lacks of such 2 requirements (key-values are not >> refreshed after a configurable timeout and a KeyNotFound callback cannot be >> handled). >> >> Any help is appreciated, >> Flavio >> >> >> >
Re: LookupableTableSource question
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service. The only strange thing I see is that the key of the local cache is per block of keys..am I wrong? Shouldn't it cycle over the list of passed keys? Right now it's the following: Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... while I'd use the following (also for JDBC): Cache> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... Am I missing something? On Fri, Jun 28, 2019 at 4:18 PM JingsongLee wrote: > Hi Flavio: > > I just implement a JDBCLookupFunction[1]. You can use it as table > function[2]. Or use > blink temporal table join[3] (Need blink planner support). > I add a google guava cache in JDBCLookupFunction with configurable > cacheMaxSize > (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). > Is that you want? > > [1] > https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java > [2] > https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 > [3] > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 > > Best, JingsongLee > > -- > From:Flavio Pompermaier > Send Time:2019年6月28日(星期五) 21:04 > To:user > Subject:LookupableTableSource question > > Hi to all, > I have a use case where I'd like to enrich a stream using a rarely updated > lookup table. > Basically, I'd like to be able to set a refresh policy that is triggered > either when a key was not found (a new key has probably been added in the > mean time) or a configurable refresh-period has elapsed. > > Is there any suggested solution to this? The LookupableTableSource looks > very similar to what I'd like to achieve but I can't find a real-world > example using it and it lacks of such 2 requirements (key-values are not > refreshed after a configurable timeout and a KeyNotFound callback cannot be > handled). > > Any help is appreciated, > Flavio > > >
Re: LookupableTableSource question
Hi Flavio: I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use blink temporal table join[3] (Need blink planner support). I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). Is that you want? [1] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java [2] https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 [3] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 Best, JingsongLee -- From:Flavio Pompermaier Send Time:2019年6月28日(星期五) 21:04 To:user Subject:LookupableTableSource question Hi to all, I have a use case where I'd like to enrich a stream using a rarely updated lookup table. Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed. Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled). Any help is appreciated, Flavio