Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390718727


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return

Review Comment:
   @mjsax I think you meant why `ConcurrentMap` in context of why not just 
`Map` or mention `ConcurrentHashMap`, the reason why `ConcurrentMap` was used 
because of the method `putIfAbsent` was being used which is not part of `Map` 
API.
   As I have removed `AtomicReference` and moved from `putIfAbsent` to `put` 
hence I have also moved from `ConcurrentMap` to `Map`, implementation is still 
same `ConcurrentHashMap`, I have corrected the comment accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390714375


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390713867


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsProvider.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.resource.v1.Resource;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides specification which are used to collect metrics.
+ */
+public interface MetricsProvider extends Configurable {

Review Comment:
   I have kept the class so the implementatios of `TelemetryReporter's` can use 
that, but it makes sense to remove as clients code do not need that 
abstraction, rather can directly have a class providing telemetry metrics 
specifications i.e. domain, labels, etc. I have removed the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15785) Flaky Test - testTransactionAfterTransactionIdExpiresButProducerIdRemains

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15785:
--
Summary: Flaky Test - 
testTransactionAfterTransactionIdExpiresButProducerIdRemains  (was: Investigate 
new test case failure - 
testTransactionAfterTransactionIdExpiresButProducerIdRemains)

> Flaky Test - testTransactionAfterTransactionIdExpiresButProducerIdRemains
> -
>
> Key: KAFKA-15785
> URL: https://issues.apache.org/jira/browse/KAFKA-15785
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15785) Investigate new test case failure - testTransactionAfterTransactionIdExpiresButProducerIdRemains

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15785:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains
> 
>
> Key: KAFKA-15785
> URL: https://issues.apache.org/jira/browse/KAFKA-15785
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15786) Investigate new test case failure - testBumpTransactionalEpoch

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15786:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - testBumpTransactionalEpoch
> --
>
> Key: KAFKA-15786
> URL: https://issues.apache.org/jira/browse/KAFKA-15786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15786) Flaky Test - testBumpTransactionalEpoch

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15786:
--
Summary: Flaky Test - testBumpTransactionalEpoch  (was: Investigate new 
test case failure - testBumpTransactionalEpoch)

> Flaky Test - testBumpTransactionalEpoch
> ---
>
> Key: KAFKA-15786
> URL: https://issues.apache.org/jira/browse/KAFKA-15786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15788) Investigate new test case failure - testDeleteCmdNonExistingGroup

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15788:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - testDeleteCmdNonExistingGroup
> -
>
> Key: KAFKA-15788
> URL: https://issues.apache.org/jira/browse/KAFKA-15788
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15788) Flaky Test - testDeleteCmdNonExistingGroup

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15788:
--
Summary: Flaky Test - testDeleteCmdNonExistingGroup  (was: Investigate new 
test case failure - testDeleteCmdNonExistingGroup)

> Flaky Test - testDeleteCmdNonExistingGroup
> --
>
> Key: KAFKA-15788
> URL: https://issues.apache.org/jira/browse/KAFKA-15788
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15790) Flaky Test - testHighWaterMarkAfterPartitionReassignment

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15790:
--
Summary: Flaky Test - testHighWaterMarkAfterPartitionReassignment  (was: 
Investigate new test case failure - testHighWaterMarkAfterPartitionReassignment)

> Flaky Test - testHighWaterMarkAfterPartitionReassignment
> 
>
> Key: KAFKA-15790
> URL: https://issues.apache.org/jira/browse/KAFKA-15790
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15790) Investigate new test case failure - testHighWaterMarkAfterPartitionReassignment

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15790:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - 
> testHighWaterMarkAfterPartitionReassignment
> ---
>
> Key: KAFKA-15790
> URL: https://issues.apache.org/jira/browse/KAFKA-15790
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15791) Flaky test - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15791:
--
Summary: Flaky test - 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest - 
MirrorConnectorsIntegrationBaseTest  (was: Investigate new test case failure - 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest - 
MirrorConnectorsIntegrationBaseTest)

> Flaky test - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - 
> MirrorConnectorsIntegrationBaseTest
> ---
>
> Key: KAFKA-15791
> URL: https://issues.apache.org/jira/browse/KAFKA-15791
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15791) Investigate new test case failure - MirrorConnectorsWithCustomForwardingAdminIntegrationTest - MirrorConnectorsIntegrationBaseTest

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15791:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest - 
> MirrorConnectorsIntegrationBaseTest
> --
>
> Key: KAFKA-15791
> URL: https://issues.apache.org/jira/browse/KAFKA-15791
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15789) Flaky Test - testTimeouts - QuorumControllerTest

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15789:
--
Summary: Flaky Test - testTimeouts - QuorumControllerTest  (was: 
Investigate new test case failure - testTimeouts - QuorumControllerTest)

> Flaky Test - testTimeouts - QuorumControllerTest
> 
>
> Key: KAFKA-15789
> URL: https://issues.apache.org/jira/browse/KAFKA-15789
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15789) Investigate new test case failure - testTimeouts - QuorumControllerTest

2023-11-12 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15789:
--
Parent: (was: KAFKA-15601)
Issue Type: Bug  (was: Sub-task)

> Investigate new test case failure - testTimeouts - QuorumControllerTest
> ---
>
> Key: KAFKA-15789
> URL: https://issues.apache.org/jira/browse/KAFKA-15789
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390693105


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692901


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692648


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
 private final MetricKey key;
+private final Metric.Builder metricBuilder;
 
-private SinglePointMetric(MetricKey key) {
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
 this.key = key;
+this.metricBuilder = metricBuilder;
 }
 
 @Override
 public MetricKey key() {
 return key;
 }
 
-// TODO: Implement methods for serializing/deserializing metrics in 
required format.
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric.getGaugeBuilder().addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+if (value instanceof Long || value instanceof Integer) {
+return point(timestamp, value.longValue());
+} else {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390692468


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390691654


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a

Review Comment:
   My bad, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390691332


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390690505


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390690071


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   I have removed AtomicReference, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-12 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1390688746


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+private final MetricKey key;
+private final Metric.Builder metricBuilder;
+
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+this.key = key;
+this.metricBuilder = metricBuilder;
+}
+
+@Override
+public MetricKey key() {
+return key;
+}
+
+public Metric.Builder metric() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)

Review Comment:
   Resolving the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15812) Add support to send only sparse metrics, only non-zero values

2023-11-12 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15812:
-

 Summary: Add support to send only sparse metrics, only non-zero 
values 
 Key: KAFKA-15812
 URL: https://issues.apache.org/jira/browse/KAFKA-15812
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15811) implement capturing client port information from Socket Server

2023-11-12 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15811:
-

 Summary: implement capturing client port information from Socket 
Server
 Key: KAFKA-15811
 URL: https://issues.apache.org/jira/browse/KAFKA-15811
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15810) Is there a solution to build Kafka-2.7 with gradle-8.4 and jdk-21?

2023-11-12 Thread rain.liang (Jira)
rain.liang created KAFKA-15810:
--

 Summary: Is there a solution to build Kafka-2.7 with gradle-8.4 
and jdk-21?
 Key: KAFKA-15810
 URL: https://issues.apache.org/jira/browse/KAFKA-15810
 Project: Kafka
  Issue Type: Wish
Affects Versions: 2.7.2
Reporter: rain.liang


    Hello, I tried to migrate the Kafka-2.7 from jdk-11 to jdk-21 to try the 
virtual thread of jdk-21, and got into trouble. Firstly the syntax of gradle 
changes a lot from gradle-6.6 to gradle-8.4. Secondly, the versions of the 
dependencies also change. Thirdly, there may be new dependencies needed since 
the jdk is updated. I wonder if there have been solutions to my problem. Thanks 
a lot. 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-12 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran updated KAFKA-15809:
---
Fix Version/s: 3.7.0

> Adding TS to broker's features list and updating broker's metadata schema to 
> include TS enable status
> -
>
> Key: KAFKA-15809
> URL: https://issues.apache.org/jira/browse/KAFKA-15809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Phuc Hong Tran
>Assignee: Phuc Hong Tran
>Priority: Critical
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Currently controller doesn't have the visibility of all brokers's TS enable 
> status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
> status of brokers so that controller can check for these status before 
> enabling TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-12 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785379#comment-17785379
 ] 

Phuc Hong Tran edited comment on KAFKA-15809 at 11/13/23 6:23 AM:
--

[~divijvaidya] [~showuon], since we're updating the metadata.version to the 
version that support Tiered Storage, do we need to change the feature level of 
version 
IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What 
does the feature level field indicating?


was (Author: JIRAUSER301295):
[~divijvaidya], since we're updating the metadata.version to the version that 
support Tiered Storage, do we need to change the feature level of version 
IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What 
does the feature level field indicating?

> Adding TS to broker's features list and updating broker's metadata schema to 
> include TS enable status
> -
>
> Key: KAFKA-15809
> URL: https://issues.apache.org/jira/browse/KAFKA-15809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Phuc Hong Tran
>Assignee: Phuc Hong Tran
>Priority: Critical
>  Labels: KIP-405
>
> Currently controller doesn't have the visibility of all brokers's TS enable 
> status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
> status of brokers so that controller can check for these status before 
> enabling TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-12 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran updated KAFKA-15809:
---
Labels: KIP-405  (was: )

> Adding TS to broker's features list and updating broker's metadata schema to 
> include TS enable status
> -
>
> Key: KAFKA-15809
> URL: https://issues.apache.org/jira/browse/KAFKA-15809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Phuc Hong Tran
>Assignee: Phuc Hong Tran
>Priority: Critical
>  Labels: KIP-405
>
> Currently controller doesn't have the visibility of all brokers's TS enable 
> status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
> status of brokers so that controller can check for these status before 
> enabling TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-12 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785379#comment-17785379
 ] 

Phuc Hong Tran commented on KAFKA-15809:


[~divijvaidya], since we're updating the metadata.version to the version that 
support Tiered Storage, do we need to change the feature level of version 
IBP_3_5_IV0 to 1 and change others version's feature level accordingly? What 
does the feature level field indicating?

> Adding TS to broker's features list and updating broker's metadata schema to 
> include TS enable status
> -
>
> Key: KAFKA-15809
> URL: https://issues.apache.org/jira/browse/KAFKA-15809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Phuc Hong Tran
>Assignee: Phuc Hong Tran
>Priority: Critical
>
> Currently controller doesn't have the visibility of all brokers's TS enable 
> status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
> status of brokers so that controller can check for these status before 
> enabling TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15686) Consumer should be able to detect network problem

2023-11-12 Thread Jiahongchao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiahongchao closed KAFKA-15686.
---

> Consumer should be able to detect network problem
> -
>
> Key: KAFKA-15686
> URL: https://issues.apache.org/jira/browse/KAFKA-15686
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Affects Versions: 3.5.0
>Reporter: Jiahongchao
>Assignee: Philip Nee
>Priority: Minor
>
> When we call poll method in consumer, it will return normally even if some 
> partitions do not have a leader.
> What should we do to detect such failures? Currently we have to check log to 
> find out broker connection problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2023-11-12 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1807487012

   Hi @mimaison Can we rerun the job now, jdk_21_latest has been installed on 
the shared power nodes by ASF infra.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-12 Thread via GitHub


ableegoldman commented on PR #14708:
URL: https://github.com/apache/kafka/pull/14708#issuecomment-1807479243

   Ah shoot, sorry everyone, I just clicked immediately on the test failure tab 
and glossed over the build report on the landing page. Wish we still had the 
individual version builds reported directly on the PR :/ but that's my mistake.
   Appreciate the quick action and fix. I think the patch that was merged makes 
sense but I'll double check with the original PR author.
   Thanks for jumping on this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Fix nanosecond elapsed time [kafka]

2023-11-12 Thread via GitHub


github-actions[bot] commented on PR #14168:
URL: https://github.com/apache/kafka/pull/14168#issuecomment-1807425026

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Addressing NPE when broker's initialCatchUpFuture fails [kafka]

2023-11-12 Thread via GitHub


github-actions[bot] commented on PR #14109:
URL: https://github.com/apache/kafka/pull/14109#issuecomment-1807425064

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390590490


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = LEAVE_GROUP_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390583612


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,111 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
+ */
+PREPARE_LEAVING,
+
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until

Review Comment:
   Yes, similar to the ACKNOWLEDGING in the sense that they are just a way to 
indicate that a 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390582429


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
  * @param topic to be requested. If empty, return the metadata for all 
topics.
  * @return the future of the metadata request.
  */
-public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {

Review Comment:
   You're right, not needed anymore (all metadata interaction is now based on 
the centralized metadata cache). All removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15708: KRaft support in FetchRequestDownConversionConfigTest [kafka]

2023-11-12 Thread via GitHub


linzihao1999 commented on PR #14715:
URL: https://github.com/apache/kafka/pull/14715#issuecomment-1807392262

   Passed the CI build.
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14715/2/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15741: KRaft support in DescribeConsumerGroupTest [kafka]

2023-11-12 Thread via GitHub


linzihao1999 commented on PR #14668:
URL: https://github.com/apache/kafka/pull/14668#issuecomment-1807388570

   @dengziming Hi, I re-trigger the CI build and passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-12 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785337#comment-17785337
 ] 

Phuc Hong Tran edited comment on KAFKA-15341 at 11/12/23 11:27 PM:
---

[~divijvaidya] I have created subtask KAFKA-15809 for adding broker's TS enable 
status to metadata and adding TS as a feature to broker


was (Author: JIRAUSER301295):
[~divijvaidya] I have created subtask 
[KAFKA-15809|https://issues.apache.org/jira/browse/KAFKA-15809] to add broker's 
TS enable status to metadata

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-12 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785337#comment-17785337
 ] 

Phuc Hong Tran commented on KAFKA-15341:


[~divijvaidya] I have created subtask 
[KAFKA-15809|https://issues.apache.org/jira/browse/KAFKA-15809] to add broker's 
TS enable status to metadata

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-12 Thread Phuc Hong Tran (Jira)
Phuc Hong Tran created KAFKA-15809:
--

 Summary: Adding TS to broker's features list and updating broker's 
metadata schema to include TS enable status
 Key: KAFKA-15809
 URL: https://issues.apache.org/jira/browse/KAFKA-15809
 Project: Kafka
  Issue Type: Sub-task
Reporter: Phuc Hong Tran
Assignee: Phuc Hong Tran


Currently controller doesn't have the visibility of all brokers's TS enable 
status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
status of brokers so that controller can check for these status before enabling 
TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15795: Support fetch(fromKey, toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and WindowRangeQuery [kafka]

2023-11-12 Thread via GitHub


hanyuzheng7 opened a new pull request, #14745:
URL: https://github.com/apache/kafka/pull/14745

   KIP-997: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery
   
   We aim to enhance the WindowRangeQuery  by supporting a new method: 
fetch(keyFrom, keyTo, from, to). Currently, WindowRangeQuery  utilizes 
KeyValueIterator, V> fetchAll(Instant timeFrom, Instant timeTo)  
for retrieving all key-value pairs within a specified time range. However, we 
propose to use KeyValueIterator, V> fetch(K keyFrom, K keyTo, 
Instant timeFrom, Instant timeTo)  instead. This new method will provide a 
KeyValueIterator, V>  that allows users to iterate over windowed 
key-value pairs {{
   
   {, value>}
   }} , spanning the entire time range.
   
   With this new method, users can retrieve window sessions for specific keys, 
rather than all keys, which enables a more targeted query. This is an 
improvement over the fetchAll  method, which only allows retrieval of all key's 
window sessions without the ability to specify a range of keys.
   
   Additionally, this enhancement also allows the WindowRangeQuery  to support 
WindowKeyQuery  functionality. Users seeking to query window sessions for a 
specific key can do so by setting keyFrom  and keyTo  to be equal. This dual 
functionality provides more flexibility and efficiency in querying windowed 
keys.
   

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]

2023-11-12 Thread via GitHub


jolshan commented on PR #14506:
URL: https://github.com/apache/kafka/pull/14506#issuecomment-1807174506

   Thanks @showuon for taking a look. Makes sense to me.
   I also saw the conversation here. https://github.com/apache/kafka/pull/14741 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]

2023-11-12 Thread via GitHub


runom opened a new pull request, #14744:
URL: https://github.com/apache/kafka/pull/14744

   This PR fixed the position of the assertion that checks that `BytesOut` 
doesn't include replication.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-11-12 Thread via GitHub


runom commented on code in PR #14661:
URL: https://github.com/apache/kafka/pull/14661#discussion_r1390407748


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -469,7 +469,7 @@ public void createTopic(CommandTopicPartition topic) throws 
Exception {
 CreateTopicsResult createResult = 
adminClient.createTopics(Collections.singleton(newTopic),
 new CreateTopicsOptions().retryOnQuotaViolation(false));
 createResult.all().get();
-System.out.println("Created topic " + topic.name + ".");
+System.out.println("Created topic " + topic.name.get() + ".");

Review Comment:
   I changed the type of it. And it seems to be working.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-11-12 Thread via GitHub


runom commented on PR #14661:
URL: https://github.com/apache/kafka/pull/14661#issuecomment-1807117335

   I've updated the pull request. `TopicCommandTest` and 
`TopicCommandIntegrationTest` are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-12 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785136#comment-17785136
 ] 

Phuc Hong Tran edited comment on KAFKA-15341 at 11/12/23 9:49 AM:
--

[~divijvaidya] controller doesn't know if individual broker has TS enable or 
not. It's best to have controller know the data about TS enable of brokers 
other than brokers rejecting TS per topic enablement call as it would be 
inefficient since we has to reach out to all those brokers(which don’t have 
tiered storage enabled) only to get rejected.


was (Author: JIRAUSER301295):
[~divijvaidya] controller doesn't know if individual broker has TS enable or 
not. It's best to have controller know the data about TS enable of brokers 
other than brokers rejecting TS per topic enablement call as it would be 
inefficient since we has to reach out to all those brokers only to get rejected.

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] ignore [kafka]

2023-11-12 Thread via GitHub


dajac opened a new pull request, #14743:
URL: https://github.com/apache/kafka/pull/14743

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] POC: run group coordinator state machine on request handler pool [kafka]

2023-11-12 Thread via GitHub


dajac commented on PR #14728:
URL: https://github.com/apache/kafka/pull/14728#issuecomment-1807054468

   Thanks for the PR. I believe that isolating the workload from the 
produce/fetch workload (running in the api handlers) is actually a good thing 
but we would need to measure to compare both approaches.
   
   Would it be possible to implement a new CoordinatorEventProcessor for this 
instead of hijacking the runtime? We made it pluggable for this reason. We 
wanted to have some flexibility with the threading model.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]

2023-11-12 Thread via GitHub


vamossagar12 commented on PR #14742:
URL: https://github.com/apache/kafka/pull/14742#issuecomment-1807053171

   Yes, I realised it later on :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]

2023-11-12 Thread via GitHub


vamossagar12 commented on PR #14741:
URL: https://github.com/apache/kafka/pull/14741#issuecomment-1807052792

   > @showuon , I noticed the JDK21 build failed with `[this-escape] possible 
'this' escape before subclass is fully initialized`. I created [this 
PR](https://github.com/apache/kafka/pull/14742) to get around that issue. The 
classes in question in Streams merged yesterday. Let me know what you think.
   
   Actually ignore, found that 
[this](https://github.com/apache/kafka/pull/14708) already fixes it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]

2023-11-12 Thread via GitHub


vamossagar12 closed pull request #14742: Adding this-escape for 
MaterializedStoreFactory implementors
URL: https://github.com/apache/kafka/pull/14742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]

2023-11-12 Thread via GitHub


dajac commented on PR #14742:
URL: https://github.com/apache/kafka/pull/14742#issuecomment-1807052588

   Thanks for looking into this but I just merged 
https://github.com/apache/kafka/pull/14740 to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]

2023-11-12 Thread via GitHub


vamossagar12 commented on PR #14741:
URL: https://github.com/apache/kafka/pull/14741#issuecomment-1807052287

   @showuon , I noticed the JDK21 build failed with `[this-escape] possible 
'this' escape before subclass is fully initialized`. I created [this 
PR](https://github.com/apache/kafka/pull/14742) to get around that issue. The 
classes in question in Streams merged yesterday. Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Adding this-escape for MaterializedStoreFactory implementors [kafka]

2023-11-12 Thread via GitHub


vamossagar12 opened a new pull request, #14742:
URL: https://github.com/apache/kafka/pull/14742

   Noticed the latest https://github.com/apache/kafka/pull/14741 [failed 
compileJava on JDK 
21](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14741/1/pipeline/)
 due to =>
   
   ```
   > Task :streams:compileJava
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14741/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java:46:
 warning: [this-escape] possible 'this' escape before subclass is fully 
initialized
   
   retentionPeriod = retentionPeriod();
   
^
   
   error: warnings found and -Werror specified
   ```
   
   This seems to be happening because this-escape is [a new linter added in JDK 
21](https://bugs.openjdk.org/browse/JDK-8015831), which warns about the 
constructor calling other methods. This PR is an attempt to check if this 
suppression helps in getting around this issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org