ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -198,7 +198,7 @@
                                 .define(METRICS_RECORDING_LEVEL_CONFIG,
                                         Type.STRING,
                                         Sensor.RecordingLevel.INFO.toString(),
-                                        
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
+                                        
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),

Review comment:
       It's kind of a bummer that we can't just add the new TRACE level for 
Streams only; we have to add it to all the clients that Streams passes its 
configs down to. We could check for the new TRACE level and strip it off before 
passing the configs on to the clients, but that just seems like asking for 
trouble.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -88,13 +92,6 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
-    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";

Review comment:
       Moved the common descriptions to StreamsMetricsImpl

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
         return byteEntries;
     }
 
+    private void maybeRecordE2ELatency() {
+        if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
       For KV stores, we just compare the current time with the current 
record's timestamp

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -248,4 +253,12 @@ public void close() {
     private Bytes keyBytes(final K key) {
         return Bytes.wrap(serdes.rawKey(key));
     }
+
+    private void maybeRecordE2ELatency() {
+        if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
       For session and window stores, we also just compare the current time 
with the current record's timestamp when `put` is called. This can mean the e2e 
latency is measured several times on the same record, for example in a windowed 
aggregation.
   At first I thought that didn't make sense, but now I think it's actually 
exactly what we want. First of all, it means we can actually account for the 
latency between calls to `put` within a processor. For simple point inserts 
this might not be a huge increase on the scale of ms, but more complex 
processing may benefit from seeing this granularity of information. If they 
don't want it, well, that's why we introduced `TRACE`
   
   Second, while it might seem like we're over-weighting some records by 
measuring the e2e latency on them more than others, I'm starting to think this 
actually makes more sense than not: the big picture benefit/use case for the 
e2e latency metric is less "how long for this record to get sent downstream" 
and more "how long for this record to be reflected in the state store/IQ 
results". Given that, each record should be weighted by its actual proportion 
of the state store. You aren't querying individual records (in a window store), 
you're querying the windows themselves
   
   I toyed around with the idea of measuring the e2e latency relative to the 
window time, instead of the record timestamp, but ultimately couldn't find any 
sense in that. 
   Thoughts?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
       Ok there's something I'm not understanding about this test and/or the 
built-in metrics version. For some reason, the KV-store metrics are 0 when 
`METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in 
used. I feel like this is wrong, and it should always be 1, but I need some 
clarify on how this config is supposed to be used
   What makes me pretty sure there's something actually wrong here is that for 
the window/session store metrics, they are actually always at 1. But I can't 
figure out why the KV store metrics would be any different than the others. Any 
ideas @cadonna ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to