Hi Yuan,
Sorry for pending such long time on this thread. I think adding unified
abstraction and metrics for cache is quite important for users and developers
to optimize and improve their jobs with lookup join. We also have our inner
cache abstraction and implementation, so I took a deeper observation and
here’re some thoughts of mine.
1. Metrics
I think users would be interested to these 3 aspects when debugging or
benchmarking their jobs:
(1) Hit / Miss rate
- hitCount, Counter type, to track number of cache hit
- missCount, Counter type, to track number of cache miss
Here we just report the raw count instead of rate to external metric system,
since it’s easier and more flexible to make aggregations and calculate rate in
metric systems like Prometheus.
(2) Loading throughput and latency
- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by
cache
- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
These two can be used for tracking the throughput of loading
- latestLoadTime, Gauge type, to track the time spent for the latest load
operation
Actually it’s better to use histogram for tracking latency, but it’s quite
expensive to manage a histogram. Personally I think a gauge would be good
enough to reflect the latency.
- numLoadFailures, Counter type, to track number of failed loads.
(3) Current usage
- numCachedRecords, Gauge type, to track number of entries in cache
- numCachedBytes, Gauge type, to track number of bytes used by cache
Most of the metrics above are similar to your original proposal, and here’s the
difference:
(1) I still think it’s weird to report identifier and type as metrics. It’s
quite handy to get the actual cache type through the code path, nevertheless
some metric systems don't support string-type metrics (like Prometheus).
(2) numRecords is renamed to numCachedRecords
(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users
would be interested to know how many times it loads (missCount), and how many
failures (numLoadFailures)
(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite
meaningful for users to see a long run job reporting totalLoadTime with hours
even days as value.
2. APIs
(1) CacheMetricGroup:
public interface CacheMetricGroup {
Counter getHitCounter();
Counter getMissCounter();
Counter getNumRecordsLoadedTotalCounter();
Counter getNumBytesLoadedTotalCounter();
Gauge<Long> getLatestLoadTimeGauge();
Counter getNumLoadFailureCounter();
void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
}
Note that some metrics are provided as getters since they are quite straight
forward, except numCacheRecords/Bytes, which should be left for cache
implementers.
(2) Cache
public interface Cache<K, V> extends AutoClosable {
void open(CacheMetricGroup cacheMetricGroup);
V get(K key, Callable<? extends V> loader) throws Exception;
void put(K key, V value);
void putAll(Map<? extends K, ? extends V> m);
void clean();
long size();
}
Compared to your proposal:
a. `getIdentifier()` is removed. I can’t see any usage of this function, since
we are not dynamically loading cache implementations via SPI or factory style.
b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
c. Extends `AutoClosable` to be symmetric to open, for cleaning resources
claimed by cache
d. `getMetricGroup()` is removed. Metric groups should be exposed to cache
implementations instead of users.
3. Other topics
Another point to note is that if you check the usage of cache in JDBC and Hive
lookup table, the value type is List<RowData>, since it’s common that a joining
key could mapped to multiple rows. We could add another layer of abstraction
under Cache interface, for example:
OneToManyCache<K, V> extends Cache<K, List<V>>
And add interfaces like `appendToKey(List<V>)` to it. What do you think?
Cheers,
Qingsheng
> On Mar 7, 2022, at 16:00, [email protected] wrote:
>
> Hi devs,
>
>
> I would like to propose a discussion thread about abstraction of Cache
> LookupFunction with metrics for cache in connectors to make cache out of box
> for connector developers. There are multiple LookupFunction implementations
> in individual connectors [1][2][3][4] so far.
> At the same time, users can monitor cache in LookupFunction by adding uniform
> cache metrics to optimize tasks or troubleshoot.
>
>
> I have posted an issue about this, see
> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design
> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>
>
> Looking forward to your feedback, thanks.
>
>
> Best regards,
> Yuan
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
> [4]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java