Sorry I copied and pasted twice the current eval method...I'd do this: public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List<Row> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > This could be a good fit, I'll try to dig into it and see if it can be > adapted to a REST service. > The only strange thing I see is that the key of the local cache is per > block of keys..am I wrong? > Shouldn't it cycle over the list of passed keys? > > Right now it's the following: > > Cache<Row, List<Row>> cache; > > public void eval(Object... keys) { > Row keyRow = Row.of(keys); > if (cache != null) { > List<Row> cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > ... > > while I'd use the following (also for JDBC): > > Cache<Row, List<Row>> cache; > > public void eval(Object... keys) { > Row keyRow = Row.of(keys); > if (cache != null) { > List<Row> cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > ... > > public void eval(Object... keys) { > for (Object kkk : keys) { > Row keyRow = Row.of(kkk); > if (cache != null) { > List<Row> cachedRows = cache.getIfPresent(keyRow); > if (cachedRows != null) { > for (Row cachedRow : cachedRows) { > collect(cachedRow); > } > return; > } > } > } > ... > > Am I missing something? > > > On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <lzljs3620...@aliyun.com> > wrote: > >> Hi Flavio: >> >> I just implement a JDBCLookupFunction[1]. You can use it as table >> function[2]. Or use >> blink temporal table join[3] (Need blink planner support). >> I add a google guava cache in JDBCLookupFunction with configurable >> cacheMaxSize >> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). >> Is that you want? >> >> [1] >> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java >> [2] >> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143 >> [3] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75 >> >> Best, JingsongLee >> >> ------------------------------------------------------------------ >> From:Flavio Pompermaier <pomperma...@okkam.it> >> Send Time:2019年6月28日(星期五) 21:04 >> To:user <user@flink.apache.org> >> Subject:LookupableTableSource question >> >> Hi to all, >> I have a use case where I'd like to enrich a stream using a rarely >> updated lookup table. >> Basically, I'd like to be able to set a refresh policy that is triggered >> either when a key was not found (a new key has probably been added in the >> mean time) or a configurable refresh-period has elapsed. >> >> Is there any suggested solution to this? The LookupableTableSource looks >> very similar to what I'd like to achieve but I can't find a real-world >> example using it and it lacks of such 2 requirements (key-values are not >> refreshed after a configurable timeout and a KeyNotFound callback cannot be >> handled). >> >> Any help is appreciated, >> Flavio >> >> >> >