Hi Yuan, 

I’m fine to skip the OneToManyCache since it’s just a one-line implementation 
under the abstraction of cache. Thanks for the review and advice. 

Qingsheng

> On Apr 14, 2022, at 19:23, zst...@163.com wrote:
> 
> Hi Qingsheng,
> 
> 
> 
> 
> Thanks very much for your detail advice. The first two points are very clear 
> and make sense.  I have only one question about the third point.
> 
> 
>> Another point to note is that if you check the usage of cache in JDBC and 
>> Hive lookup table, the value type is List<RowData>, since it’s common that a 
>> joining key could mapped to multiple rows. We could add another layer of 
>> abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
> IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method 
> `appendToKey(List<V>)` can be replaced by put(K, V). What do you think?
> 
> 
> 
> 
> Best regards,
> Yuan
> 
> At 2022-04-13 15:20:01, "Qingsheng Ren" <renqs...@gmail.com> wrote:
>> Hi Yuan,
>> 
>> Sorry for pending such long time on this thread. I think adding unified 
>> abstraction and metrics for cache is quite important for users and 
>> developers to optimize and improve their jobs with lookup join. We also have 
>> our inner cache abstraction and implementation, so I took a deeper 
>> observation and here’re some thoughts of mine. 
>> 
>> 1. Metrics
>> 
>> I think users would be interested to these 3 aspects when debugging or 
>> benchmarking their jobs: 
>> 
>> (1) Hit / Miss rate
>> - hitCount, Counter type, to track number of cache hit
>> - missCount, Counter type, to track number of cache miss
>> Here we just report the raw count instead of rate to external metric system, 
>> since it’s easier and more flexible to make aggregations and calculate rate 
>> in metric systems like Prometheus.
>> 
>> (2) Loading throughput and latency
>> - numBytesLoadedTotal, Counter type, to track number of bytes totally loaded 
>> by cache
>> - numRecordsLoadedTotal, Counter type, to track number of records totally 
>> loaded
>> These two can be used for tracking the throughput of loading
>> 
>> - latestLoadTime, Gauge type, to track the time spent for the latest load 
>> operation
>> Actually it’s better to use histogram for tracking latency, but it’s quite 
>> expensive to manage a histogram. Personally I think a gauge would be good 
>> enough to reflect the latency.
>> 
>> - numLoadFailures, Counter type, to track number of failed loads.
>> 
>> (3) Current usage
>> - numCachedRecords, Gauge type, to track number of entries in cache
>> - numCachedBytes, Gauge type, to track number of bytes used by cache
>> 
>> Most of the metrics above are similar to your original proposal, and here’s 
>> the difference: 
>> (1) I still think it’s weird to report identifier and type as metrics. It’s 
>> quite handy to get the actual cache type through the code path, nevertheless 
>> some metric systems don't support string-type metrics (like Prometheus). 
>> (2) numRecords is renamed to numCachedRecords
>> (3) loadSuccessCount is deduced by missCount - numLoadFailures. I think 
>> users would be interested to know how many times it loads (missCount), and 
>> how many failures (numLoadFailures)
>> (4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite 
>> meaningful for users to see a long run job reporting totalLoadTime with 
>> hours even days as value.
>> 
>> 2. APIs
>> 
>> (1) CacheMetricGroup: 
>> 
>> public interface CacheMetricGroup {
>>   Counter getHitCounter();
>> 
>>   Counter getMissCounter();
>> 
>>   Counter getNumRecordsLoadedTotalCounter();
>> 
>>   Counter getNumBytesLoadedTotalCounter();
>> 
>>   Gauge<Long> getLatestLoadTimeGauge();
>> 
>>   Counter getNumLoadFailureCounter();
>> 
>>   void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>> 
>>   void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>> }
>> 
>> Note that some metrics are provided as getters since they are quite straight 
>> forward, except numCacheRecords/Bytes, which should be left for cache 
>> implementers. 
>> 
>> (2) Cache
>> 
>> public interface Cache<K, V> extends AutoClosable {
>>   void open(CacheMetricGroup cacheMetricGroup);
>> 
>>   V get(K key, Callable<? extends V> loader) throws Exception;
>> 
>>   void put(K key, V value);
>> 
>>   void putAll(Map<? extends K, ? extends V> m);
>> 
>>   void clean();
>> 
>>   long size();
>> }
>> 
>> Compared to your proposal: 
>> a. `getIdentifier()` is removed. I can’t see any usage of this function, 
>> since we are not dynamically loading cache implementations via SPI or 
>> factory style.
>> b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>> c. Extends `AutoClosable` to be symmetric to open, for cleaning resources 
>> claimed by cache
>> d. `getMetricGroup()` is removed. Metric groups should be exposed to cache 
>> implementations instead of users. 
>> 
>> 3. Other topics
>> Another point to note is that if you check the usage of cache in JDBC and 
>> Hive lookup table, the value type is List<RowData>, since it’s common that a 
>> joining key could mapped to multiple rows. We could add another layer of 
>> abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
>> 
>> And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>> 
>> Cheers, 
>> 
>> Qingsheng
>> 
>>> On Mar 7, 2022, at 16:00, zst...@163.com wrote:
>>> 
>>> Hi devs,
>>> 
>>> 
>>> I would like to propose a discussion thread about abstraction of Cache 
>>> LookupFunction with metrics for cache in connectors to make cache out of 
>>> box for connector developers. There are multiple LookupFunction 
>>> implementations in individual connectors [1][2][3][4] so far. 
>>> At the same time, users can monitor cache in LookupFunction by adding 
>>> uniform cache metrics to optimize tasks or troubleshoot.
>>> 
>>> 
>>> I have posted an issue about this, see 
>>> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief 
>>> design 
>>> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>>> 
>>> 
>>> Looking forward to your feedback, thanks.
>>> 
>>> 
>>> Best regards,
>>> Yuan
>>> 
>>> 
>>> 
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>>> [2] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>>> [3] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>>> [4] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Reply via email to