Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> This could be a good fit, I'll try to dig into it and see if it can be
> adapted to a REST service.
> The only strange thing I see is that the key of the local cache is per
> block of keys..am I wrong?
> Shouldn't it cycle over the list of passed keys?
>
> Right now it's the following:
>
> Cache<Row, List<Row>> cache;
>
> public void eval(Object... keys) {
>     Row keyRow = Row.of(keys);
>     if (cache != null) {
>         List<Row> cachedRows = cache.getIfPresent(keyRow);
>         if (cachedRows != null) {
>             for (Row cachedRow : cachedRows) {
>                 collect(cachedRow);
>             }
>             return;
>         }
>     }
>  ...
>
> while I'd use the following (also for JDBC):
>
> Cache<Row, List<Row>> cache;
>
> public void eval(Object... keys) {
>     Row keyRow = Row.of(keys);
>     if (cache != null) {
>         List<Row> cachedRows = cache.getIfPresent(keyRow);
>         if (cachedRows != null) {
>             for (Row cachedRow : cachedRows) {
>                 collect(cachedRow);
>             }
>             return;
>         }
>     }
>  ...
>
> public void eval(Object... keys) {
>     for (Object kkk : keys) {
>         Row keyRow = Row.of(kkk);
>         if (cache != null) {
>             List<Row> cachedRows = cache.getIfPresent(keyRow);
>             if (cachedRows != null) {
>                 for (Row cachedRow : cachedRows) {
>                     collect(cachedRow);
>                 }
>                 return;
>             }
>         }
>     }
>  ...
>
> Am I missing something?
>
>
> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <lzljs3620...@aliyun.com>
> wrote:
>
>> Hi Flavio:
>>
>> I just implement a JDBCLookupFunction[1]. You can use it as table
>> function[2]. Or use
>> blink temporal table join[3] (Need blink planner support).
>> I add a google guava cache in JDBCLookupFunction with configurable
>> cacheMaxSize
>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>> Is that you want?
>>
>> [1]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>> [2]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>> [3]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>
>>  Best, JingsongLee
>>
>> ------------------------------------------------------------------
>> From:Flavio Pompermaier <pomperma...@okkam.it>
>> Send Time:2019年6月28日(星期五) 21:04
>> To:user <user@flink.apache.org>
>> Subject:LookupableTableSource question
>>
>> Hi to all,
>> I have a use case where I'd like to enrich a stream using a rarely
>> updated lookup table.
>> Basically, I'd like to be able to set a refresh policy that is triggered
>> either when a key was not found (a new key has probably been added in the
>> mean time) or a configurable refresh-period has elapsed.
>>
>> Is there any suggested solution to this? The LookupableTableSource looks
>> very similar to what I'd like to achieve but I can't find a real-world
>> example using it and it lacks of such 2 requirements (key-values are not
>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>> handled).
>>
>> Any help is appreciated,
>> Flavio
>>
>>
>>
>

Reply via email to