[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-13 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r469781319



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##
@@ -289,6 +294,25 @@ public static Sensor 
processorAtSourceSensorOrForwardSensor(final String threadI
 return processAtSourceSensor(threadId, taskId, processorNodeId, 
streamsMetrics);
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String processorNodeId,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, 
sensorName, RecordingLevel.INFO);

Review comment:
   I do not think that using `StreamsMetrics#nodeLevelSensor()` is a 
breaking change. Sensors are not exposed publicly. They are merely containers 
for metrics. Metrics are exposed publicy. The full sensor name is just the key 
to retrieve sensors in the metrics map. It is also stored in 
`StreamsMetricsImpl` in the `*LevelSensor` data structures to know what sensors 
need to be removed from the metrics map when `removeAll*LevelSensors()` is 
called. So, the full sensor name is used for internal house keeping and hidden 
from the users. I think it is fine to change the `fullSensorName`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   You can use the following to get it right without the need to do the 
check for the e2e latency before filtering
   
   ```
   .filter(m -> m.metricName().tags().containsKey(tagKey) && 
   (m.metricName().group().equals(group0100To24) || 
m.metricName().group().equals(STATE_STORE_LEVEL_GROUP))
   ).collect(Collectors.toList());
   
   ```
   
   The reason for the difference between the KV store and the window store is 
that they are used in different tests with different number of state stores.
   
   The test that uses the KV stores tests three different types of KV stores, 
namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the 
old group name changes. That is also the reason we need to pass the parameter 
`group0100To24` to `checkKeyValueStoreMetrics()`.
   
   In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for 
four groups, because the corresponding test uses suppression and window state 
store. Suppression buffers had their own group in the old version. In the new 
version they moved into the state store group. Those groups are 
`BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state 
store had their own group in the old version, i.e., 
`STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using 
RocksDB-based window stores in the test). Finally, during the implementation of 
KIP-444, we discovered that we named a group incorrectly. That's why we filter 
also for group `stream-rocksdb-window-metrics`.
   
   So to sum up, it is hard to compare the verifications for KV stores and 
window stores, because they are used in different tests. Sorry, I should have 
been clearer on that before. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   You can use the following to get it right without the need to do the 
check for the e2e latency before filtering
   
   ```
   .filter(m -> m.metricName().tags().containsKey(tagKey) && 
   (m.metricName().group().equals(group0100To24) || 
m.metricName().group().equals(STATE_STORE_LEVEL_GROUP))
   ).collect(Collectors.toList());
   
   ```
   
   The reason for the difference between the KV store and the window store is 
that they are used in different tests with different number of state stores.
   
   The test that uses the KV stores tests three different types of KV stores, 
namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the 
old group name changes. That is also the reason we need to pass the parameter 
`group0100To24` to `checkKeyValueStoreMetrics()`.
   
   In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for 
four groups, because the corresponding test uses suppression and window state 
store. Suppression buffers had their own groups in the old version. In the new 
version they moved into the state store group. Those groups are 
`BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state 
store had their own group in the old version, i.e., 
`STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using 
RocksDB-based window stores in the test). Finally, during the implementationof 
KIP-444, we discovered that we named a group incorrectly. That why we filter 
also for group `stream-rocksdb-window-metrics`.
   
   So to sum up, it is hard to compare the verifications for KV stores and 
window stores, because they are used in different tests. Sorry, I should have 
been clearer on that before. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462883347



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
##
@@ -327,6 +327,38 @@ public void shouldGetExpiredWindowRecordDropSensor() {
 assertThat(sensor, is(expectedSensor));
 }
 
+@Test
+public void shouldGetRecordE2ELatencySensor() {
+final String metricName = "record-e2e-latency";
+
+final String e2eLatencyDescription =
+"end-to-end latency of a record, measuring by comparing the record 
timestamp with the "
++ "system time when it has been fully processed by the node";
+final String descriptionOfAvg = "The average " + e2eLatencyDescription;
+final String descriptionOfMin = "The minimum " + e2eLatencyDescription;
+final String descriptionOfMax = "The maximum " + e2eLatencyDescription;
+
+expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, 
metricName, RecordingLevel.TRACE))
+.andReturn(expectedSensor);
+expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, 
STORE_NAME)).andReturn(storeTagMap);
+StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
+expectedSensor,
+STORE_LEVEL_GROUP,

Review comment:
   I just realized that we should not put new metrics into old groups. Your 
code is fine. Do not use instance variable `storeLevelGroup`! Sorry for the 
confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462882926



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##
@@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final 
String threadId,
 );
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, 
taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
+final Map tagMap = 
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
+addAvgAndMinAndMaxToSensor(
+sensor,
+STATE_STORE_LEVEL_GROUP,

Review comment:
   I just realized that we should not put new metrics into old groups. Your 
code is fine. Do not use `stateStoreLevelGroup()`! Sorry for the confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462881933



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   Sorry, I did a mistake here. We should not give new metrics old groups. 
I think to fix this test you need to adapt the filter on line 618 to let all 
metrics with groups that relate to KV state stores pass. See 
`checkWindowStoreAndSuppressionBufferMetrics()` for an example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
 return byteEntries;
 }
 
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   I think, you do not need to check for metrics with 
`e2eLatencySensor.hasMetrics()`. There should always be metrics within this 
sensor.  
   `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because 
some sensors may not contain any metrics due to the built-in metrics version. 
For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 
but not for latest. To avoid version checks in the record processing code, we 
just create an empty sensor and call record on it effectively not recording any 
metrics for this sensor for version latest.
   We do not hide newly added metrics if the built-in version is set to an 
older version.
   Same applies to the other uses of `hasMetrics()` introduced in this PR.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -248,4 +253,12 @@ public void close() {
 private Bytes keyBytes(final K key) {
 return Bytes.wrap(serdes.rawKey(key));
 }
+
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   Your approach makes sense to me. I agree that the latency should refer 
to the update in the state store and not to record itself. If a record updates 
the state more than once then latency should be measured each time. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##
@@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final 
String threadId,
 );
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, 
taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
+final Map tagMap = 
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
+addAvgAndMinAndMaxToSensor(
+sensor,
+STATE_STORE_LEVEL_GROUP,

Review comment:
   You need to use the `stateStoreLevelGroup()` here instead of 
`STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the 
store type.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   I agree with you, it should always be 1. It is the group of the metrics. 
See my comment in `StateStoreMetrics`. I am glad this test served its purpose, 
because I did not notice this in the unit tests! 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -468,38 +468,44 @@ public void 
shouldRecordE2ELatencyOnProcessForSourceNodes() {
 }
 
 @Test
-public void shouldRecordE2ELatencyMinAndMax() {
+public void shouldRecordE2ELatencyAvgAndMinAndMax() {
 time = new MockTime(0L, 0L, 0L);
 metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
 
 final String sourceNode = source1.name();
 
-final Metric maxMetric = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+final Metric avgMetric = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
 final Metric minMetric = getProcessorMetric("record-e2e-latency", 
"%s-min",