Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390718727 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return Review Comment: @mjsax I think you meant why `ConcurrentMap` in context of why not just `Map` or mention `ConcurrentHashMap`, the reason why `ConcurrentMap` was used because of the method `putIfAbsent` was being used which is not part of `Map` API. As I have removed `AtomicReference` and moved from `putIfAbsent` to `put` hence I have also moved from `ConcurrentMap` to `Map`, implementation is still same `ConcurrentHashMap`, I have corrected the comment accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390714375 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390713867 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsProvider.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.resource.v1.Resource; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.List; +import java.util.Map; + +/** + * Provides specification which are used to collect metrics. + */ +public interface MetricsProvider extends Configurable { Review Comment: I have kept the class so the implementatios of `TelemetryReporter's` can use that, but it makes sense to remove as clients code do not need that abstraction, rather can directly have a class providing telemetry metrics specifications i.e. domain, labels, etc. I have removed the interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15785) Flaky Test - testTransactionAfterTransactionIdExpiresButProducerIdRemains
[ https://issues.apache.org/jira/browse/KAFKA-15785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15785: -- Summary: Flaky Test - testTransactionAfterTransactionIdExpiresButProducerIdRemains (was: Investigate new test case failure - testTransactionAfterTransactionIdExpiresButProducerIdRemains) > Flaky Test - testTransactionAfterTransactionIdExpiresButProducerIdRemains > - > > Key: KAFKA-15785 > URL: https://issues.apache.org/jira/browse/KAFKA-15785 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15785) Investigate new test case failure - testTransactionAfterTransactionIdExpiresButProducerIdRemains
[ https://issues.apache.org/jira/browse/KAFKA-15785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15785: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - > testTransactionAfterTransactionIdExpiresButProducerIdRemains > > > Key: KAFKA-15785 > URL: https://issues.apache.org/jira/browse/KAFKA-15785 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15786) Investigate new test case failure - testBumpTransactionalEpoch
[ https://issues.apache.org/jira/browse/KAFKA-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15786: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - testBumpTransactionalEpoch > -- > > Key: KAFKA-15786 > URL: https://issues.apache.org/jira/browse/KAFKA-15786 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15786) Flaky Test - testBumpTransactionalEpoch
[ https://issues.apache.org/jira/browse/KAFKA-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15786: -- Summary: Flaky Test - testBumpTransactionalEpoch (was: Investigate new test case failure - testBumpTransactionalEpoch) > Flaky Test - testBumpTransactionalEpoch > --- > > Key: KAFKA-15786 > URL: https://issues.apache.org/jira/browse/KAFKA-15786 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15788) Investigate new test case failure - testDeleteCmdNonExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-15788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15788: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - testDeleteCmdNonExistingGroup > - > > Key: KAFKA-15788 > URL: https://issues.apache.org/jira/browse/KAFKA-15788 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15788) Flaky Test - testDeleteCmdNonExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-15788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15788: -- Summary: Flaky Test - testDeleteCmdNonExistingGroup (was: Investigate new test case failure - testDeleteCmdNonExistingGroup) > Flaky Test - testDeleteCmdNonExistingGroup > -- > > Key: KAFKA-15788 > URL: https://issues.apache.org/jira/browse/KAFKA-15788 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15790) Flaky Test - testHighWaterMarkAfterPartitionReassignment
[ https://issues.apache.org/jira/browse/KAFKA-15790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15790: -- Summary: Flaky Test - testHighWaterMarkAfterPartitionReassignment (was: Investigate new test case failure - testHighWaterMarkAfterPartitionReassignment) > Flaky Test - testHighWaterMarkAfterPartitionReassignment > > > Key: KAFKA-15790 > URL: https://issues.apache.org/jira/browse/KAFKA-15790 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15790) Investigate new test case failure - testHighWaterMarkAfterPartitionReassignment
[ https://issues.apache.org/jira/browse/KAFKA-15790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15790: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - > testHighWaterMarkAfterPartitionReassignment > --- > > Key: KAFKA-15790 > URL: https://issues.apache.org/jira/browse/KAFKA-15790 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15791) Flaky test - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest
[ https://issues.apache.org/jira/browse/KAFKA-15791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15791: -- Summary: Flaky test - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest (was: Investigate new test case failure - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest) > Flaky test - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - > MirrorConnectorsIntegrationBaseTest > --- > > Key: KAFKA-15791 > URL: https://issues.apache.org/jira/browse/KAFKA-15791 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15791) Investigate new test case failure - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest
[ https://issues.apache.org/jira/browse/KAFKA-15791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15791: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - > MirrorConnectorsWithCustomForwardingAdminIntegrationTest - > MirrorConnectorsIntegrationBaseTest > -- > > Key: KAFKA-15791 > URL: https://issues.apache.org/jira/browse/KAFKA-15791 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15789) Flaky Test - testTimeouts - QuorumControllerTest
[ https://issues.apache.org/jira/browse/KAFKA-15789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15789: -- Summary: Flaky Test - testTimeouts - QuorumControllerTest (was: Investigate new test case failure - testTimeouts - QuorumControllerTest) > Flaky Test - testTimeouts - QuorumControllerTest > > > Key: KAFKA-15789 > URL: https://issues.apache.org/jira/browse/KAFKA-15789 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15789) Investigate new test case failure - testTimeouts - QuorumControllerTest
[ https://issues.apache.org/jira/browse/KAFKA-15789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15789: -- Parent: (was: KAFKA-15601) Issue Type: Bug (was: Sub-task) > Investigate new test case failure - testTimeouts - QuorumControllerTest > --- > > Key: KAFKA-15789 > URL: https://issues.apache.org/jira/browse/KAFKA-15789 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > > PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure > which are not related to the PR though. This Jira tracks the failure of these > tests for investigation if current changes somehow impact the tests. > CI: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/] > Failed tests > Build / JDK 17 and Scala 2.13 / > testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft > – kafka.api.ProducerIdExpirationTest > 8s > Build / JDK 8 and Scala 2.12 / > testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest > 1m 20s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > 2m 15s > Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 1m 51s > Build / JDK 11 and Scala 2.13 / > testDeleteCmdNonExistingGroup(String).quorum=kraft – > kafka.admin.DeleteConsumerGroupsTest > 11s > Build / JDK 11 and Scala 2.13 / testTimeouts() – > org.apache.kafka.controller.QuorumControllerTest > <1s > Build / JDK 11 and Scala 2.13 / > testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – > org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390693105 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692901 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692648 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java: ## @@ -16,22 +16,135 @@ */ package org.apache.kafka.common.telemetry.internals; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** - * This class represents a metric that does not yet contain resource tags. + * This class represents a telemetry metric that does not yet contain resource tags. * These additional resource tags will be added before emitting metrics by the telemetry reporter. */ public class SinglePointMetric implements MetricKeyable { private final MetricKey key; +private final Metric.Builder metricBuilder; -private SinglePointMetric(MetricKey key) { +private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) { this.key = key; +this.metricBuilder = metricBuilder; } @Override public MetricKey key() { return key; } -// TODO: Implement methods for serializing/deserializing metrics in required format. +public Metric.Builder builder() { +return metricBuilder; +} + +public static SinglePointMetric create(MetricKey metricKey, Metric.Builder metric) { +return new SinglePointMetric(metricKey, metric); +} + +/* +Methods to construct gauge metric type. + */ +public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +/* +Methods to construct sum metric type. + */ + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) { +return sum(metricKey, value, monotonic, timestamp, null); +} + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp, +Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +if (startTimestamp != null) { +point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); +} + +return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point); +} + +public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic, +Instant timestamp, Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value) +.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); + +return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point); +} + +/* +Helper methods to support metric construction. + */ +private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality aggregationTemporality, +boolean monotonic, NumberDataPoint.Builder point) { +point.addAllAttributes(asAttributes(metricKey.tags())); + +Metric.Builder metric = Metric.newBuilder().setName(metricKey.name()); +metric +.getSumBuilder() +.setAggregationTemporality(aggregationTemporality) +.setIsMonotonic(monotonic) +.addDataPoints(point); +return create(metricKey, metric); +} + +private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point) { +point.addAllAttributes(asAttributes(metricKey.tags())); + +Metric.Builder metric = Metric.newBuilder().setName(metricKey.name()); +metric.getGaugeBuilder().addDataPoints(point); +return create(metricKey, metric); +} + +private static NumberDataPoint.Builder point(Instant timestamp, Number value) { +if (value instanceof Long || value instanceof Integer) { +return point(timestamp, value.longValue()); +} else { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692468 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390691654 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a Review Comment: My bad, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390691332 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390690505 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390690071 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return + * a previous value and an Instant for that value. + * + * @param The type of the value. + */ +public class LastValueTracker { +private final ConcurrentMap>> counters = new ConcurrentHashMap<>(); + +/** + * Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one. + * + * @param metricKey the key for which to calculate a getAndSet. + * @param now the timestamp for the new value. + * @param value the current value. + * @return the timestamp of the previous entry and its value. If there + * isn't a previous entry, then this method returns {@link Optional#empty()} + */ +public Optional> getAndSet(MetricKey metricKey, Instant now, T value) { +InstantAndValue instantAndValue = new InstantAndValue<>(now, value); +AtomicReference> valueOrNull = counters Review Comment: I have removed AtomicReference, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1390688746 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * This class represents a telemetry metric that does not yet contain resource tags. + * These additional resource tags will be added before emitting metrics by the telemetry reporter. + */ +public class SinglePointMetric implements MetricKeyable { + +private final MetricKey key; +private final Metric.Builder metricBuilder; + +private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) { +this.key = key; +this.metricBuilder = metricBuilder; +} + +@Override +public MetricKey key() { +return key; +} + +public Metric.Builder metric() { +return metricBuilder; +} + +public static SinglePointMetric create(MetricKey metricKey, Metric.Builder metric) { +return new SinglePointMetric(metricKey, metric); +} + +/* +Methods to construct gauge metric type. + */ +public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +/* +Methods to construct sum metric type. + */ + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) { +return sum(metricKey, value, monotonic, timestamp, null); +} + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp, +Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +if (startTimestamp != null) { +point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); +} + +return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point); +} + +public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic, +Instant timestamp, Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value) Review Comment: Resolving the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15812) Add support to send only sparse metrics, only non-zero values
Apoorv Mittal created KAFKA-15812: - Summary: Add support to send only sparse metrics, only non-zero values Key: KAFKA-15812 URL: https://issues.apache.org/jira/browse/KAFKA-15812 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15811) implement capturing client port information from Socket Server
Apoorv Mittal created KAFKA-15811: - Summary: implement capturing client port information from Socket Server Key: KAFKA-15811 URL: https://issues.apache.org/jira/browse/KAFKA-15811 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15810) Is there a solution to build Kafka-2.7 with gradle-8.4 and jdk-21?
rain.liang created KAFKA-15810: -- Summary: Is there a solution to build Kafka-2.7 with gradle-8.4 and jdk-21? Key: KAFKA-15810 URL: https://issues.apache.org/jira/browse/KAFKA-15810 Project: Kafka Issue Type: Wish Affects Versions: 2.7.2 Reporter: rain.liang Hello, I tried to migrate the Kafka-2.7 from jdk-11 to jdk-21 to try the virtual thread of jdk-21, and got into trouble. Firstly the syntax of gradle changes a lot from gradle-6.6 to gradle-8.4. Secondly, the versions of the dependencies also change. Thirdly, there may be new dependencies needed since the jdk is updated. I wonder if there have been solutions to my problem. Thanks a lot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
[ https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran updated KAFKA-15809: --- Fix Version/s: 3.7.0 > Adding TS to broker's features list and updating broker's metadata schema to > include TS enable status > - > > Key: KAFKA-15809 > URL: https://issues.apache.org/jira/browse/KAFKA-15809 > Project: Kafka > Issue Type: Sub-task >Reporter: Phuc Hong Tran >Assignee: Phuc Hong Tran >Priority: Critical > Labels: KIP-405 > Fix For: 3.7.0 > > > Currently controller doesn't have the visibility of all brokers's TS enable > status. As mentioned in KAFKA-15341, we need to add metadata about TS enable > status of brokers so that controller can check for these status before > enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
[ https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785379#comment-17785379 ] Phuc Hong Tran edited comment on KAFKA-15809 at 11/13/23 6:23 AM: -- [~divijvaidya] [~showuon], since we're updating the metadata.version to the version that support Tiered Storage, do we need to change the feature level of version IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What does the feature level field indicating? was (Author: JIRAUSER301295): [~divijvaidya], since we're updating the metadata.version to the version that support Tiered Storage, do we need to change the feature level of version IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What does the feature level field indicating? > Adding TS to broker's features list and updating broker's metadata schema to > include TS enable status > - > > Key: KAFKA-15809 > URL: https://issues.apache.org/jira/browse/KAFKA-15809 > Project: Kafka > Issue Type: Sub-task >Reporter: Phuc Hong Tran >Assignee: Phuc Hong Tran >Priority: Critical > Labels: KIP-405 > > Currently controller doesn't have the visibility of all brokers's TS enable > status. As mentioned in KAFKA-15341, we need to add metadata about TS enable > status of brokers so that controller can check for these status before > enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
[ https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran updated KAFKA-15809: --- Labels: KIP-405 (was: ) > Adding TS to broker's features list and updating broker's metadata schema to > include TS enable status > - > > Key: KAFKA-15809 > URL: https://issues.apache.org/jira/browse/KAFKA-15809 > Project: Kafka > Issue Type: Sub-task >Reporter: Phuc Hong Tran >Assignee: Phuc Hong Tran >Priority: Critical > Labels: KIP-405 > > Currently controller doesn't have the visibility of all brokers's TS enable > status. As mentioned in KAFKA-15341, we need to add metadata about TS enable > status of brokers so that controller can check for these status before > enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
[ https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785379#comment-17785379 ] Phuc Hong Tran commented on KAFKA-15809: [~divijvaidya], since we're updating the metadata.version to the version that support Tiered Storage, do we need to change the feature level of version IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What does the feature level field indicating? > Adding TS to broker's features list and updating broker's metadata schema to > include TS enable status > - > > Key: KAFKA-15809 > URL: https://issues.apache.org/jira/browse/KAFKA-15809 > Project: Kafka > Issue Type: Sub-task >Reporter: Phuc Hong Tran >Assignee: Phuc Hong Tran >Priority: Critical > > Currently controller doesn't have the visibility of all brokers's TS enable > status. As mentioned in KAFKA-15341, we need to add metadata about TS enable > status of brokers so that controller can check for these status before > enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15686) Consumer should be able to detect network problem
[ https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiahongchao closed KAFKA-15686. --- > Consumer should be able to detect network problem > - > > Key: KAFKA-15686 > URL: https://issues.apache.org/jira/browse/KAFKA-15686 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Affects Versions: 3.5.0 >Reporter: Jiahongchao >Assignee: Philip Nee >Priority: Minor > > When we call poll method in consumer, it will return normally even if some > partitions do not have a leader. > What should we do to detect such failures? Currently we have to check log to > find out broker connection problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1807487012 Hi @mimaison Can we rerun the job now, jdk_21_latest has been installed on the shared power nodes by ASF infra. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]
ableegoldman commented on PR #14708: URL: https://github.com/apache/kafka/pull/14708#issuecomment-1807479243 Ah shoot, sorry everyone, I just clicked immediately on the test failure tab and glossed over the build report on the landing page. Wish we still had the individual version builds reported directly on the PR :/ but that's my mistake. Appreciate the quick action and fix. I think the patch that was merged makes sense but I'll double check with the original PR author. Thanks for jumping on this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Fix nanosecond elapsed time [kafka]
github-actions[bot] commented on PR #14168: URL: https://github.com/apache/kafka/pull/14168#issuecomment-1807425026 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Addressing NPE when broker's initialCatchUpFuture fails [kafka]
github-actions[bot] commented on PR #14109: URL: https://github.com/apache/kafka/pull/14109#issuecomment-1807425064 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390590490 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +256,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = LEAVE_GROUP_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390583612 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,111 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. + */ +PREPARE_LEAVING, + +/** + * Member has committed offsets and releases its assignment, so it stays in this state until Review Comment: Yes, similar to the ACKNOWLEDGING in the sense that they are just a way to indicate that a
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390582429 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { * @param topic to be requested. If empty, return the metadata for all topics. * @return the future of the metadata request. */ -public CompletableFuture>> requestTopicMetadata(final Optional topic) { +public CompletableFuture>> requestTopicMetadata(final Optional topic) { Review Comment: You're right, not needed anymore (all metadata interaction is now based on the centralized metadata cache). All removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15708: KRaft support in FetchRequestDownConversionConfigTest [kafka]
linzihao1999 commented on PR #14715: URL: https://github.com/apache/kafka/pull/14715#issuecomment-1807392262 Passed the CI build. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14715/2/pipeline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15741: KRaft support in DescribeConsumerGroupTest [kafka]
linzihao1999 commented on PR #14668: URL: https://github.com/apache/kafka/pull/14668#issuecomment-1807388570 @dengziming Hi, I re-trigger the CI build and passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785337#comment-17785337 ] Phuc Hong Tran edited comment on KAFKA-15341 at 11/12/23 11:27 PM: --- [~divijvaidya] I have created subtask KAFKA-15809 for adding broker's TS enable status to metadata and adding TS as a feature to broker was (Author: JIRAUSER301295): [~divijvaidya] I have created subtask [KAFKA-15809|https://issues.apache.org/jira/browse/KAFKA-15809] to add broker's TS enable status to metadata > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785337#comment-17785337 ] Phuc Hong Tran commented on KAFKA-15341: [~divijvaidya] I have created subtask [KAFKA-15809|https://issues.apache.org/jira/browse/KAFKA-15809] to add broker's TS enable status to metadata > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
Phuc Hong Tran created KAFKA-15809: -- Summary: Adding TS to broker's features list and updating broker's metadata schema to include TS enable status Key: KAFKA-15809 URL: https://issues.apache.org/jira/browse/KAFKA-15809 Project: Kafka Issue Type: Sub-task Reporter: Phuc Hong Tran Assignee: Phuc Hong Tran Currently controller doesn't have the visibility of all brokers's TS enable status. As mentioned in KAFKA-15341, we need to add metadata about TS enable status of brokers so that controller can check for these status before enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15795: Support fetch(fromKey, toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and WindowRangeQuery [kafka]
hanyuzheng7 opened a new pull request, #14745: URL: https://github.com/apache/kafka/pull/14745 KIP-997: https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery We aim to enhance the WindowRangeQuery by supporting a new method: fetch(keyFrom, keyTo, from, to). Currently, WindowRangeQuery utilizes KeyValueIterator, V> fetchAll(Instant timeFrom, Instant timeTo) for retrieving all key-value pairs within a specified time range. However, we propose to use KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo) instead. This new method will provide a KeyValueIterator, V> that allows users to iterate over windowed key-value pairs {{ {, value>} }} , spanning the entire time range. With this new method, users can retrieve window sessions for specific keys, rather than all keys, which enables a more targeted query. This is an improvement over the fetchAll method, which only allows retrieval of all key's window sessions without the ability to specify a range of keys. Additionally, this enhancement also allows the WindowRangeQuery to support WindowKeyQuery functionality. Users seeking to query window sessions for a specific key can do so by setting keyFrom and keyTo to be equal. This dual functionality provides more flexibility and efficiency in querying windowed keys. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]
jolshan commented on PR #14506: URL: https://github.com/apache/kafka/pull/14506#issuecomment-1807174506 Thanks @showuon for taking a look. Makes sense to me. I also saw the conversation here. https://github.com/apache/kafka/pull/14741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]
runom opened a new pull request, #14744: URL: https://github.com/apache/kafka/pull/14744 This PR fixed the position of the assertion that checks that `BytesOut` doesn't include replication. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]
runom commented on code in PR #14661: URL: https://github.com/apache/kafka/pull/14661#discussion_r1390407748 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -469,7 +469,7 @@ public void createTopic(CommandTopicPartition topic) throws Exception { CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().retryOnQuotaViolation(false)); createResult.all().get(); -System.out.println("Created topic " + topic.name + "."); +System.out.println("Created topic " + topic.name.get() + "."); Review Comment: I changed the type of it. And it seems to be working. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]
runom commented on PR #14661: URL: https://github.com/apache/kafka/pull/14661#issuecomment-1807117335 I've updated the pull request. `TopicCommandTest` and `TopicCommandIntegrationTest` are passing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785136#comment-17785136 ] Phuc Hong Tran edited comment on KAFKA-15341 at 11/12/23 9:49 AM: -- [~divijvaidya] controller doesn't know if individual broker has TS enable or not. It's best to have controller know the data about TS enable of brokers other than brokers rejecting TS per topic enablement call as it would be inefficient since we has to reach out to all those brokers(which don’t have tiered storage enabled) only to get rejected. was (Author: JIRAUSER301295): [~divijvaidya] controller doesn't know if individual broker has TS enable or not. It's best to have controller know the data about TS enable of brokers other than brokers rejecting TS per topic enablement call as it would be inefficient since we has to reach out to all those brokers only to get rejected. > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] ignore [kafka]
dajac opened a new pull request, #14743: URL: https://github.com/apache/kafka/pull/14743 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] POC: run group coordinator state machine on request handler pool [kafka]
dajac commented on PR #14728: URL: https://github.com/apache/kafka/pull/14728#issuecomment-1807054468 Thanks for the PR. I believe that isolating the workload from the produce/fetch workload (running in the api handlers) is actually a good thing but we would need to measure to compare both approaches. Would it be possible to implement a new CoordinatorEventProcessor for this instead of hijacking the runtime? We made it pluggable for this reason. We wanted to have some flexibility with the threading model. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]
vamossagar12 commented on PR #14742: URL: https://github.com/apache/kafka/pull/14742#issuecomment-1807053171 Yes, I realised it later on :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]
vamossagar12 commented on PR #14741: URL: https://github.com/apache/kafka/pull/14741#issuecomment-1807052792 > @showuon , I noticed the JDK21 build failed with `[this-escape] possible 'this' escape before subclass is fully initialized`. I created [this PR](https://github.com/apache/kafka/pull/14742) to get around that issue. The classes in question in Streams merged yesterday. Let me know what you think. Actually ignore, found that [this](https://github.com/apache/kafka/pull/14708) already fixes it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]
vamossagar12 closed pull request #14742: Adding this-escape for MaterializedStoreFactory implementors URL: https://github.com/apache/kafka/pull/14742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]
dajac commented on PR #14742: URL: https://github.com/apache/kafka/pull/14742#issuecomment-1807052588 Thanks for looking into this but I just merged https://github.com/apache/kafka/pull/14740 to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]
vamossagar12 commented on PR #14741: URL: https://github.com/apache/kafka/pull/14741#issuecomment-1807052287 @showuon , I noticed the JDK21 build failed with `[this-escape] possible 'this' escape before subclass is fully initialized`. I created [this PR](https://github.com/apache/kafka/pull/14742) to get around that issue. The classes in question in Streams merged yesterday. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]
vamossagar12 opened a new pull request, #14742: URL: https://github.com/apache/kafka/pull/14742 Noticed the latest https://github.com/apache/kafka/pull/14741 [failed compileJava on JDK 21](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14741/1/pipeline/) due to => ``` > Task :streams:compileJava /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14741/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java:46: warning: [this-escape] possible 'this' escape before subclass is fully initialized retentionPeriod = retentionPeriod(); ^ error: warnings found and -Werror specified ``` This seems to be happening because this-escape is [a new linter added in JDK 21](https://bugs.openjdk.org/browse/JDK-8015831), which warns about the constructor calling other methods. This PR is an attempt to check if this suppression helps in getting around this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org