Hi Yuan, I’m fine to skip the OneToManyCache since it’s just a one-line implementation under the abstraction of cache. Thanks for the review and advice.
Qingsheng > On Apr 14, 2022, at 19:23, zst...@163.com wrote: > > Hi Qingsheng, > > > > > Thanks very much for your detail advice. The first two points are very clear > and make sense. I have only one question about the third point. > > >> Another point to note is that if you check the usage of cache in JDBC and >> Hive lookup table, the value type is List<RowData>, since it’s common that a >> joining key could mapped to multiple rows. We could add another layer of >> abstraction under Cache interface, for example: >> >> OneToManyCache<K, V> extends Cache<K, List<V>> > IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method > `appendToKey(List<V>)` can be replaced by put(K, V). What do you think? > > > > > Best regards, > Yuan > > At 2022-04-13 15:20:01, "Qingsheng Ren" <renqs...@gmail.com> wrote: >> Hi Yuan, >> >> Sorry for pending such long time on this thread. I think adding unified >> abstraction and metrics for cache is quite important for users and >> developers to optimize and improve their jobs with lookup join. We also have >> our inner cache abstraction and implementation, so I took a deeper >> observation and here’re some thoughts of mine. >> >> 1. Metrics >> >> I think users would be interested to these 3 aspects when debugging or >> benchmarking their jobs: >> >> (1) Hit / Miss rate >> - hitCount, Counter type, to track number of cache hit >> - missCount, Counter type, to track number of cache miss >> Here we just report the raw count instead of rate to external metric system, >> since it’s easier and more flexible to make aggregations and calculate rate >> in metric systems like Prometheus. >> >> (2) Loading throughput and latency >> - numBytesLoadedTotal, Counter type, to track number of bytes totally loaded >> by cache >> - numRecordsLoadedTotal, Counter type, to track number of records totally >> loaded >> These two can be used for tracking the throughput of loading >> >> - latestLoadTime, Gauge type, to track the time spent for the latest load >> operation >> Actually it’s better to use histogram for tracking latency, but it’s quite >> expensive to manage a histogram. Personally I think a gauge would be good >> enough to reflect the latency. >> >> - numLoadFailures, Counter type, to track number of failed loads. >> >> (3) Current usage >> - numCachedRecords, Gauge type, to track number of entries in cache >> - numCachedBytes, Gauge type, to track number of bytes used by cache >> >> Most of the metrics above are similar to your original proposal, and here’s >> the difference: >> (1) I still think it’s weird to report identifier and type as metrics. It’s >> quite handy to get the actual cache type through the code path, nevertheless >> some metric systems don't support string-type metrics (like Prometheus). >> (2) numRecords is renamed to numCachedRecords >> (3) loadSuccessCount is deduced by missCount - numLoadFailures. I think >> users would be interested to know how many times it loads (missCount), and >> how many failures (numLoadFailures) >> (4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite >> meaningful for users to see a long run job reporting totalLoadTime with >> hours even days as value. >> >> 2. APIs >> >> (1) CacheMetricGroup: >> >> public interface CacheMetricGroup { >> Counter getHitCounter(); >> >> Counter getMissCounter(); >> >> Counter getNumRecordsLoadedTotalCounter(); >> >> Counter getNumBytesLoadedTotalCounter(); >> >> Gauge<Long> getLatestLoadTimeGauge(); >> >> Counter getNumLoadFailureCounter(); >> >> void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); >> >> void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge) >> } >> >> Note that some metrics are provided as getters since they are quite straight >> forward, except numCacheRecords/Bytes, which should be left for cache >> implementers. >> >> (2) Cache >> >> public interface Cache<K, V> extends AutoClosable { >> void open(CacheMetricGroup cacheMetricGroup); >> >> V get(K key, Callable<? extends V> loader) throws Exception; >> >> void put(K key, V value); >> >> void putAll(Map<? extends K, ? extends V> m); >> >> void clean(); >> >> long size(); >> } >> >> Compared to your proposal: >> a. `getIdentifier()` is removed. I can’t see any usage of this function, >> since we are not dynamically loading cache implementations via SPI or >> factory style. >> b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`. >> c. Extends `AutoClosable` to be symmetric to open, for cleaning resources >> claimed by cache >> d. `getMetricGroup()` is removed. Metric groups should be exposed to cache >> implementations instead of users. >> >> 3. Other topics >> Another point to note is that if you check the usage of cache in JDBC and >> Hive lookup table, the value type is List<RowData>, since it’s common that a >> joining key could mapped to multiple rows. We could add another layer of >> abstraction under Cache interface, for example: >> >> OneToManyCache<K, V> extends Cache<K, List<V>> >> >> And add interfaces like `appendToKey(List<V>)` to it. What do you think? >> >> Cheers, >> >> Qingsheng >> >>> On Mar 7, 2022, at 16:00, zst...@163.com wrote: >>> >>> Hi devs, >>> >>> >>> I would like to propose a discussion thread about abstraction of Cache >>> LookupFunction with metrics for cache in connectors to make cache out of >>> box for connector developers. There are multiple LookupFunction >>> implementations in individual connectors [1][2][3][4] so far. >>> At the same time, users can monitor cache in LookupFunction by adding >>> uniform cache metrics to optimize tasks or troubleshoot. >>> >>> >>> I have posted an issue about this, see >>> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief >>> design >>> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>. >>> >>> >>> Looking forward to your feedback, thanks. >>> >>> >>> Best regards, >>> Yuan >>> >>> >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java >>> [2] >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java >>> [3] >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java >>> [4] >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java