[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290766



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290580



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly
+Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+client.createPendingAuthenticationError(node, 0);
+final KafkaConsumer consumer
+= newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);

Review comment:
   refactored




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696282952



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly

Review comment:
   Yeah. There are not tests for that path, and I lost steam trying to pay 
that debt just to implement this metric.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696280437



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly
+Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+client.createPendingAuthenticationError(node, 0);
+final KafkaConsumer consumer
+= newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);

Review comment:
   it doesn't set a tick on the mock time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695342570



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
 }
 }
 
+private double getAndAssertDuration(KafkaProducer producer, String 
name, double floor) {
+double value = getMetricValue(producer, name);
+assertTrue(value > floor);
+return value;
+}
+
+@Test
+public void testMeasureTransactionDurations() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+try (KafkaProducer producer = kafkaProducer(configs, 
new StringSerializer(),
+new StringSerializer(), metadata, client, null, time)) {
+producer.initTransactions();
+assertTrue(getMetricValue(producer, "txn-init-time-total") > 
99);

Review comment:
   I'm verifying that something was measured and that it's at least 1 tick 
of the clock. The clock is shared between multiple threads (e.g. the io 
threads) so the number of ticks depends  on what threads get scheduled while 
we're in `initTransactions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695341815



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String 
metricGrpPrefix) {
 metricGroupName,
 "The average fraction of time the consumer's poll() is idle as 
opposed to waiting for the user code to process records."),
 new Avg());
+
+this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+this.commitSyncSensor.add(
+metrics.metricName("commit-sync-time-total", metricGroupName),
+new CumulativeSum()
+);
+
+this.committedSensor = metrics.sensor("committed-time-total");
+this.committedSensor.add(
+metrics.metricName("committed-time-total", metricGroupName),
+new CumulativeSum()
+);

Review comment:
   ack - will defer to a follow-up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r694522888



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -67,22 +69,35 @@
 private final Map eosV2ProducerConfigs;
 private final KafkaClientSupplier clientSupplier;
 private final StreamThread.ProcessingMode processingMode;
+private final Time time;
 
 private Producer producer;
 private boolean transactionInFlight = false;
 private boolean transactionInitialized = false;
+private double oldProducerTotalBlockedTime = 0;
 
 public StreamsProducer(final StreamsConfig config,
final String threadId,
final KafkaClientSupplier clientSupplier,
final TaskId taskId,
final UUID processId,
final LogContext logContext) {
+this(config, threadId, clientSupplier, taskId, processId, logContext, 
Time.SYSTEM);
+}

Review comment:
   it's only meant for testing. I'll make it package-private and annotate it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-18 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691775378



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##
@@ -1121,4 +1125,60 @@ public void 
shouldResetTransactionInitializedOnResetProducer() {
 verify(mockedProducer);
 }
 
+@Test
+public void shouldComputeTotalBlockedTime() {
+setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+assertThat(nonEosStreamsProducer.totalBlockedTime(), 
equalTo(expectedTotalBlocked));
+}
+
+@Test
+public void shouldComputeTotalBlockedTimeAfterReset() {
+setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+eosBetaStreamsProducer.resetProducer();
+
+final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+assertThat(eosBetaStreamsProducer.totalBlockedTime(), greaterThan(2 * 
expectedTotalBlocked));

Review comment:
   ah somehow I thought we couldn't use the hamcrest matchers. thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-18 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691768579



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+final Consumer consumer;
+final Consumer restoreConsumer;
+final Supplier producerTotalBlockedTime;
+
+StreamsThreadTotalBlockedTime(
+final Consumer consumer,
+final Consumer restoreConsumer,
+final Supplier producerTotalBlockedTime) {
+this.consumer = consumer;
+this.restoreConsumer = restoreConsumer;
+this.producerTotalBlockedTime = producerTotalBlockedTime;
+}
+
+final double getMetricValue(

Review comment:
   typo - i'll fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-18 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691767731



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger 
rocksDBMetricsRecordingTrigger() {
 }
 }
 
+public  void addThreadLevelImmutableMetric(final String name,
+final String description,
+final String threadId,
+final T value) {
+final MetricName metricName = metrics.metricName(
+name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+synchronized (threadLevelMetrics) {

Review comment:
   Ah I wasn't aware of the external vs internal sensor names. Now that I 
read through this again it seems to be that some external caller with a 
reference to `StreamsMetrics` can add their own sensors, which don't get 
cleaned up when the thread goes away. In this case we don't have external 
callers adding any thread-level metrics to the map, so we don't really need the 
prefix. Happy to include it to keep things consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688255651



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
 }
 }
 
+private static Double getMetricValue(final KafkaProducer producer, 
final String name) {
+Metrics metrics = producer.metrics;
+Metric metric =  metrics.metric(metrics.metricName(name, 
"producer-metrics"));
+return (Double) metric.metricValue();
+}
+
+@Test
+public void testFlushMeasureLatency() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+try (KafkaProducer producer = kafkaProducer(
+configs,
+new StringSerializer(),
+new StringSerializer(),
+metadata,
+client,
+null,
+time
+)) {
+producer.flush();
+double first = getMetricValue(producer, "flush-time-total");
+assertTrue(first > 99.0);

Review comment:
   Ah actually this doesn't work because the mock time is passed to and 
used from the other client threads - so the value is not predictable. So the 
best we can do is assert that at least one tick (100 nanoseconds has 
passed). I'll update the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688245686



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -590,9 +593,11 @@ else if (acks != -1)
 public void initTransactions() {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
+long now = time.nanoseconds();

Review comment:
   Actually now I remember why - the bufferpool and selector total blocked 
times are all being measured in nanos and use the suffix `time-total`. So chose 
the naming convention and unit accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r68825



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+final Consumer consumer;
+final Consumer restoreConsumer;
+final Supplier producerTotalBlockedTime;
+
+StreamsThreadTotalBlockedTime(
+final Consumer consumer,
+final Consumer restoreConsumer,
+final Supplier producerTotalBlockedTime
+) {
+this.consumer = consumer;
+this.restoreConsumer = restoreConsumer;
+this.producerTotalBlockedTime = producerTotalBlockedTime;
+}
+
+final double getMetricValue(

Review comment:
   I tried doing it this way at first, but found it hard to loop over the 
producers in `TaskManager/Tasks/ActiveTaskCreator` without breaking those 
abstractions by adding methods to return the producers so we could get the 
metrics out. So then I went the route of having the total blocked time metric 
implementation ask `TaskManager` for it's total blocked time component.
   
   > we can also use this in unit test e.g. 
https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735
   What do you mean here? This is the producer's unit test, and this method is 
computing total blocked time for a streams app.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688242729



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger 
rocksDBMetricsRecordingTrigger() {
 }
 }
 
+public  void addThreadLevelImmutableMetric(final String name,
+final String description,
+final String threadId,
+final T value) {
+final MetricName metricName = metrics.metricName(
+name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+synchronized (threadLevelMetrics) {

Review comment:
   not sure I follow the question here - we are using `threadLevelMetrics` 
to track the metrics for each thread so they can be cleaned up later on when 
the thread exits. What's wrong with using the thread id for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240994



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +180,39 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+final long start = Time.SYSTEM.nanoseconds();
 producer.close();
+final long closeTime = Time.SYSTEM.nanoseconds() - start;
+
+oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer);
 
 producer = clientSupplier.getProducer(eosV2ProducerConfigs);
 transactionInitialized = false;
 }
 
+private static double getMetricValue(final Map metrics,
+ final String name) {
+return metrics.keySet().stream()
+.filter(n -> n.name().equals(name))
+.findFirst()

Review comment:
   yeah it should always be one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240676



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
 }
 }
 
+private static Double getMetricValue(final KafkaProducer producer, 
final String name) {
+Metrics metrics = producer.metrics;
+Metric metric =  metrics.metric(metrics.metricName(name, 
"producer-metrics"));
+return (Double) metric.metricValue();
+}
+
+@Test
+public void testFlushMeasureLatency() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+try (KafkaProducer producer = kafkaProducer(
+configs,
+new StringSerializer(),
+new StringSerializer(),
+metadata,
+client,
+null,
+time
+)) {
+producer.flush();
+double first = getMetricValue(producer, "flush-time-total");
+assertTrue(first > 99.0);

Review comment:
   It's using mock time, so the value here is well-known (should be 1 
second). I'm using > rather than equalTo because I don't want the test to fail 
spuriously on floating point rounding errors. It would probably be better to 
use 
[isCloseTo](http://hamcrest.org/JavaHamcrest/javadoc/1.3/org/hamcrest/number/IsCloseTo.html)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688239443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +180,39 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+final long start = Time.SYSTEM.nanoseconds();

Review comment:
   ah good call!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688236582



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -590,9 +593,11 @@ else if (acks != -1)
 public void initTransactions() {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
+long now = time.nanoseconds();

Review comment:
   ah - I thought we were measuring in nanos. Not sure where I got that 
impression. I'll change to millis




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org