Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 closed pull request #14724: KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) URL: https://github.com/apache/kafka/pull/14724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1837060946 > Hi @apoorvmittal10 Thanks for the time to address my comments - I made another pass and added some more comments, mostly nit-picking. After this I think we are good to go. Hi @philipnee thanks for the re-review. I have addressed the feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412734130 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412733949 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { Review Comment: I am not sure if I got the comment correctly. If the comment is for adding tests case when push telemetry receives subscription error them that's covered under `testHandleResponsePushTelemetryErrorResponse`. However I have added another test case now which verifies the change in subscription (also change in temporality) when handling get subscriptions as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]
splett2 commented on PR #14891: URL: https://github.com/apache/kafka/pull/14891#issuecomment-1837054475 I don't think we want to directly call `TestUtils.createTopicWithAdmin` in this test for most cases. I didn't read through all of the tests, but I would imagine that most of these integration tests are trying to exercise the `topicService` logic. I would imagine it's probably sufficient to add a call to `ensureConsistentKRaftMetadata()` in `waitForTopicCreated` to deflake most failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729920 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
splett2 commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: this test was not changed. I just inserted the new `mockExecutorService` method in a way that generated a weird diff. note that the assertions here are the same as the ones that were "removed" from lines 159-168. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15958) Add tests to validate telemetry requests with different version
Apoorv Mittal created KAFKA-15958: - Summary: Add tests to validate telemetry requests with different version Key: KAFKA-15958 URL: https://issues.apache.org/jira/browse/KAFKA-15958 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Details: https://github.com/apache/kafka/pull/14724#discussion_r1412530561 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729783 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729589 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
splett2 commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: this code was not changed. I just inserted the new `mockExecutorService` method in a way that generated a weird diff. note that the assertions here are the same as the ones that were "removed" from lines 159-168. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729556 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
splett2 commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: this code was not changed. I just inserted the new `mockExecutorService` method in a way that generated a weird diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412723727 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412723432 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412722252 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String GROUP_MEMBER_ID = "group_member_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; + +private static final String PRODUCER_NAMESPACE = "kafka.producer"; +private static final String CONSUMER_NAMESPACE = "kafka.consumer"; + +private static final Map PRODUCER_CONFIG_MAPPING = new HashMap<>(); +private static final Map CONSUMER_CONFIG_MAPPING = new HashMap<>(); + +private volatile Resource resource = null; +private Map config = null; + +// Mapping of config keys to telemetry keys. Contains only keys which can be fetched from config. +// Config like group_member_id is not present here as it is not fetched from config. +static { +PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); +} + +@Override +public synchronized void configure(Map configs) { +this.config = configs; +} + +/** + * Validate that all the data required for generating correct metrics is present. + * + * @param metricsContext {@link MetricsContext} + * @return false if all the data required for generating correct metrics is missing, true + * otherwise. + */ +boolean validate(MetricsContext metricsContext) { +return ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels()); +} + +/** + * Sets the metrics tags for the service or library exposing metrics. This will be called before + * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and may be called anytime + * after that. + * + * @param metricsContext {@link MetricsContext} + */ +synchronized void contextChange(MetricsContext metricsContext) { +final Resource.Builder resourceBuilder = Resource.newBuilder(); + +final String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE); +if (PRODUCER_NAMESPACE.equals(namespace)) { +// Add producer resource labels. +PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { +if (config.containsKey(configKey)) { Review Comment: I think not required as configs are optional, I never needed this log while testing and debugging the complete feature. I ll skip this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412721690 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. Review Comment: Yeah, the code has a write up in ClientTelemetryState.java where next states are mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
vamossagar12 commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-1837036580 @ableegoldman , just checking if you got a chance to look at my comment above? Don't mean to be pushy on this but since 3.7 release is approaching, I thought I would want to have the PR ready so that it gives you sufficient time to review as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15932: - Assignee: Andrew Schofield > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: flaky-test > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at >
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]
kirktrue closed pull request #14768: KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer URL: https://github.com/apache/kafka/pull/14768 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1412698954 ## docs/streams/upgrade-guide.html: ## @@ -198,6 +184,21 @@ Streams API + Review Comment: Thanks for remembering the docs update! ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -115,102 +116,93 @@ public void process(final Record> record) { droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} else if (record.value().newValue != null) { +final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} +} + +private void defaultJoinInstructions(final Record> record) { +if (record.value().oldValue != null) { +final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { -final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } - -final byte[] serialOldForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); -final byte[] serialNewForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); -if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { +if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store -context().forward( -record.withKey(oldForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -DELETE_KEY_NO_PROPAGATE, -record.key(), -partition -))); -//Add to the newKey's state store. Additionally, propagate null if no FK is found there, -//since we must "unset" any output set by the previous FK-join. This is true for both INNER -//and LEFT join. +forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } -context().forward( -record.withKey(newForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, -record.key(), -partition -))); +//Add to the newKey's state store. Additionally, propagate null if no FK is found there, +//since we must "unset" any output set by the previous FK-join. This is true for both INNER +//and LEFT join. +
Re: [PR] improve logging around broker/NetworkClient connectivity [kafka]
rayalatrinadh commented on PR #14901: URL: https://github.com/apache/kafka/pull/14901#issuecomment-1836995567 improve logging around broker/NetworkClient connectivity Committed Checked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] improve logging around broker/NetworkClient connectivity [kafka]
rayalatrinadh opened a new pull request, #14901: URL: https://github.com/apache/kafka/pull/14901 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412701656 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() { verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); } + +@Test +public void testPartitionLoadGeneratesSnapshotAtHighWatermark() { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(Time.SYSTEM) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader( +new CoordinatorLoader.LoadSummary( +1000, +2000, +30, +3000), +Arrays.asList(5L, 15L, 27L), +Arrays.asList(5L, 15L))) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withCoordinatorMetrics(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); + +// Getting the coordinator context fails because the coordinator +// does not exist until scheduleLoadOperation is called. +assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the coordinator context succeeds now. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + +// When the loading completes, the coordinator transitions to active. +assertEquals(ACTIVE, ctx.state); + +assertEquals(27L, ctx.stateMachine.lastWrittenOffset()); +assertEquals(15L, ctx.stateMachine.lastCommittedOffset()); +assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L)); Review Comment: i'm not sure what you mean. this is to confirm that previous snapshots were deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412700208 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -322,7 +335,7 @@ public void testScheduleLoading() { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); -when(loader.load(TP, coordinator)).thenReturn(future); +when(loader.load(eq(TP), any())).thenReturn(future); Review Comment: i'm not sure how to go about this. The reason I had it this way was because until we call `runtime.scheduleLoadOperation()`, we don't have a context for the topic partition. but when we schedule a load operation we run `loader.load()`. If there's a way, I'll make the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +735,97 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime Review Comment: good comments :) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1590,67 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { +boolean setDeadline = false; + +final Map> result = new HashMap<>(); + +KafkaFutureImpl future = new KafkaFutureImpl<>(); +if (mainConsumerClientInstanceId != null) { +future.complete(mainConsumerClientInstanceId); +} else { +mainConsumerInstanceIdFuture = future; +setDeadline = true; +} +result.put(getName() + "-consumer", future); + +future = new KafkaFutureImpl<>(); +if (restoreConsumerClientInstanceId != null) { +future.complete(restoreConsumerClientInstanceId); +} else { +restoreConsumerInstanceIdFuture = future; +setDeadline = true; +} +result.put(getName() + "-restore-consumer", future); + +if (setDeadline) { +fetchDeadline = time.milliseconds() + timeout.toMillis(); +} + +return result; +} + +public KafkaFuture>> producersClientInstanceIds(final Duration timeout) { +final KafkaFutureImpl>> result = new KafkaFutureImpl<>(); + +if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { +//for (final TaskId taskId : taskManager.activeTaskIds()) { +//future = new KafkaFutureImpl<>(); +//if (taskProducersClientInstanceIds.get(taskId) != null) { +// future.complete(taskProducersClientInstanceIds.get(taskId)); +//} else { +//taskProducersInstanceIdsFuture.put(taskId, future); +//setDeadline = true; +//} +//result.put(getName() + "-" + taskId + "-producer", future); +//}; +} else { +final KafkaFutureImpl producerFuture = new KafkaFutureImpl<>(); +if (threadProducerClientInstanceId != null) { +producerFuture.complete(threadProducerClientInstanceId); +} else { +threadProducerInstanceIdFuture = producerFuture; +if (fetchDeadline == -1) { +fetchDeadline = time.milliseconds() + timeout.toMillis(); +} +} + +result.complete(Collections.singletonMap(getName() + "-producer", producerFuture)); Review Comment: Could we be lazy and just treat any unavailable tasks as if they were set up with telemetry is disabled on the client itself? Not ideal but should keep things simple. Otherwise we might timeout all the time with restoring tasks ## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { +private final Map consumerInstanceIds = new HashMap<>(); +private final Map producerInstanceIds = new HashMap<>(); +private Uuid adminInstanceId; + +public void addConsumerInstanceId(final String key, final Uuid instanceId) { +consumerInstanceIds.put(key, instanceId); +} + +public void
Re: [PR] errorHandling [kafka]
rayalatrinadh commented on PR #14900: URL: https://github.com/apache/kafka/pull/14900#issuecomment-1836981169 no conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] errorHandling [kafka]
rayalatrinadh opened a new pull request, #14900: URL: https://github.com/apache/kafka/pull/14900 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] errorHandling [kafka]
rayalatrinadh closed pull request #14900: errorHandling URL: https://github.com/apache/kafka/pull/14900 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe commented on PR #14899: URL: https://github.com/apache/kafka/pull/14899#issuecomment-1836970040 In addition to the junit tests, I also tested this manually and it works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe commented on code in PR #14899: URL: https://github.com/apache/kafka/pull/14899#discussion_r1412689568 ## shell/src/main/java/org/apache/kafka/shell/MetadataShell.java: ## @@ -119,6 +141,7 @@ private void initializeWithRaftManager() { } private void initializeWithSnapshotFileReader() throws Exception { +this.fileLock = takeFileLock(new File(snapshotPath).getParentFile()); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683183 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: I think we should leave it only in `poll()` for the time being, yes ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: I think we should leave it only in `poll()` for the time being, yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683354 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Oh, the line after `throwInInvalidGroupIdException` is just to appease the compiler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken
[ https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15957: --- Assignee: Lucas Brutschy > ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > broken > --- > > Key: KAFKA-15957 > URL: https://issues.apache.org/jira/browse/KAFKA-15957 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Lucas Brutschy >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
kirktrue commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1412678216 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: > Why were we using real time here at all This test has been using `MockTime` since the start, though ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: > Why were we using real time here at all This test has been using `MockTime` since the start, though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe commented on code in PR #14899: URL: https://github.com/apache/kafka/pull/14899#discussion_r1412672706 ## shell/src/main/java/org/apache/kafka/shell/MetadataShell.java: ## @@ -81,6 +85,21 @@ public MetadataShell build() { } } +static FileLock takeFileLock(File directory) { +FileLock fileLock = new FileLock(new File(directory, ".lock")); Review Comment: Yeah, that might make sense for this use-case. Since we also want to be able to look at snapshots that aren't inside log dirs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe commented on code in PR #14899: URL: https://github.com/apache/kafka/pull/14899#discussion_r1412672706 ## shell/src/main/java/org/apache/kafka/shell/MetadataShell.java: ## @@ -81,6 +85,21 @@ public MetadataShell build() { } } +static FileLock takeFileLock(File directory) { +FileLock fileLock = new FileLock(new File(directory, ".lock")); Review Comment: That seems like a bad idea since it could allow another server or tool to collide with what we're doing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
jolshan commented on PR #14895: URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836934042 fyi -- I filed https://issues.apache.org/jira/browse/KAFKA-15957 but looks like you folks are on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken
[ https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792268#comment-17792268 ] Justine Olshan commented on KAFKA-15957: [https://github.com/apache/kafka/pull/14895] is fixing this > ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > broken > --- > > Key: KAFKA-15957 > URL: https://issues.apache.org/jira/browse/KAFKA-15957 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412648726 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset); + +/** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever Review Comment: ```suggestion * Method called after loading a batch of records. In this case the maximum size of the batch is whatever ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. Review Comment: ditto here, copy this to the other two callbacks as well ```suggestion * @param storeName the name of the store being loaded ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this
[jira] [Created] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken
Justine Olshan created KAFKA-15957: -- Summary: ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken Key: KAFKA-15957 URL: https://issues.apache.org/jira/browse/KAFKA-15957 Project: Kafka Issue Type: Bug Reporter: Justine Olshan -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
jsancio commented on code in PR #14899: URL: https://github.com/apache/kafka/pull/14899#discussion_r1412657561 ## shell/src/main/java/org/apache/kafka/shell/MetadataShell.java: ## @@ -81,6 +85,21 @@ public MetadataShell build() { } } +static FileLock takeFileLock(File directory) { +FileLock fileLock = new FileLock(new File(directory, ".lock")); Review Comment: I suggest not creating a `.lock` file if the specified directory doesn't contain a `.lock` file. ## shell/src/main/java/org/apache/kafka/shell/MetadataShell.java: ## @@ -119,6 +141,7 @@ private void initializeWithRaftManager() { } private void initializeWithSnapshotFileReader() throws Exception { +this.fileLock = takeFileLock(new File(snapshotPath).getParentFile()); Review Comment: This doesn't look like the correct directory. Please take a look at the `LogManager` for details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe opened a new pull request, #14899: URL: https://github.com/apache/kafka/pull/14899 MetadataShell should take an advisory lock on the .lock file of the directory it is reading from. Add an integration test of this functionality in MetadataShellIntegrationTest.java. Note: in build.gradle, I had to add some dependencies on server-common's test files in order to use MockFaultHandler, etc. MetadataBatchLoader.java: fix a case where a log message was incorrect. The intention was to print the number equivalent to (offset + index). Instead it was printing the offset, followed by the index. So if the offset was 100 and the index was 1, 1001 would be printed rather than 101. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412638885 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } +/** + * Set the listener which is triggered whenever a standby task is updated + * + * @param standbyListener The listener triggered when a standby task is updated. + * @throws IllegalStateException if this {@code KafkaStreams} instance has already been started. + */ +public void setStandbyUpdateListener(final StandbyUpdateListener standbyListener) { Review Comment: Agreed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: 1st part : Java versions for metadata/broker and updated LogConfig [kafka]
fvaleri commented on PR #14106: URL: https://github.com/apache/kafka/pull/14106#issuecomment-1836870533 @muralibasani should you also close this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15956) MetadataShell must take the directory lock when reading
Colin McCabe created KAFKA-15956: Summary: MetadataShell must take the directory lock when reading Key: KAFKA-15956 URL: https://issues.apache.org/jira/browse/KAFKA-15956 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe MetadataShell must take the directory lock when reading files, to avoid unpleasant surprises from concurrent reads and writes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412627530 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java: ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. Generally, only a single thread at a + * time will access this object but multiple threads may access while loading the __consumer_offsets topic partition. + */ +class ContextStateMachine, U> implements CoordinatorPlayback { +/** + * The logger. + */ +private final Logger log; +/** + * The actual state machine. + */ +private S coordinator; + +/** + * The snapshot registry backing the coordinator. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The topic partition. + */ +private final TopicPartition tp; + +/** + * The last offset written to the partition. + */ +private long lastWrittenOffset; + +/** + * The last offset committed. This represents the high + * watermark of the partition. + */ +private long lastCommittedOffset; + +ContextStateMachine( +LogContext logContext, +SnapshotRegistry snapshotRegistry, +S coordinator, +TopicPartition tp +) { +this.log = logContext.logger(ContextStateMachine.class); +this.coordinator = coordinator; +this.snapshotRegistry = snapshotRegistry; +this.tp = tp; +this.lastWrittenOffset = 0; +this.lastCommittedOffset = 0; +snapshotRegistry.getOrCreateSnapshot(0); +snapshotRegistry.deleteSnapshotsUpTo(0); +} + +/** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ +synchronized void revertLastWrittenOffset( +long offset +) { +if (offset > lastWrittenOffset) { +throw new IllegalStateException("New offset " + offset + " of " + tp + +" must be smaller than " + lastWrittenOffset + "."); +} + +log.debug("Revert last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.revertToSnapshot(offset); +} + +@Override +public synchronized void replay( +long producerId, +short producerEpoch, +U record +) { +coordinator.replay(producerId, producerEpoch, record); +} + +/** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ +@Override +public synchronized void updateLastWrittenOffset(Long offset) { +if (offset <= lastWrittenOffset) { +throw new IllegalStateException("New last written offset " + offset + " of " + tp + +" must be greater than " + lastWrittenOffset + "."); +} + +log.debug("Update last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.getOrCreateSnapshot(offset); +} + +/** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ +@Override +public synchronized void updateLastCommittedOffset(Long offset) { +if (offset < lastCommittedOffset) { +throw new IllegalStateException("New committed offset " + offset + " of " + tp + +" must be greater than or equal to " + lastCommittedOffset + "."); +} + +lastCommittedOffset = offset; +
Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]
jolshan commented on PR #14896: URL: https://github.com/apache/kafka/pull/14896#issuecomment-1836863361 I've also been seeing more failed builds -- ``` * What went wrong: Execution failed for task ':streams:upgrade-system-tests-25:test'. > Process 'Gradle Test Executor 9' finished with non-zero exit value 1 This problem might be caused by incorrect test process configuration. ``` From this PR -- is it possible the new consumer could be throwing errors that cause test issues like this? I know it should be on the test to handle errors in a way that doesn't kill the build, but I have been seeing many failed builds due to these issues or GC/OOM issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
ableegoldman commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1412625873 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -64,31 +62,14 @@ public RocksDBTimestampedStore(final String name, @Override void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { -final List columnFamilyDescriptors = asList( -new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), -new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); -final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); - -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} catch (final RocksDBException e) { -if ("Column family not found: keyValueWithTimestamp".equals(e.getMessage())) { -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors.subList(0, 1), columnFamilies); - columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1))); -} catch (final RocksDBException fatal) { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal); -} -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} else { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); -} -} -} +final List columnFamilies = openRocksDB( +dbOptions, +new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), +new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions) Review Comment: We should define a constant for `keyValueWithTimestamp` instead of hard-coding the column family name like this. Especially since we'll be adding additional column families. We should have them all defined in one central place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]
jolshan commented on PR #14896: URL: https://github.com/apache/kafka/pull/14896#issuecomment-1836861158 Hmmm -- are we concerned with introducing these new test failures? I agree a timeout is good, but I'm wondering if there is a way we could suppress these failures for PR builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622438 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T]( } currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { +coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { +coordinator.updateLastCommittedOffset(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: added a comment above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622265 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java: ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. Generally, only a single thread at a + * time will access this object but multiple threads may access while loading the __consumer_offsets topic partition. + */ +class ContextStateMachine, U> implements CoordinatorPlayback { Review Comment: i didn't like it either :) thanks for the suggestion, sounds much nicer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
jolshan commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r141264 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: Why were we using real time here at all So now we refresh once more due to mock time passing? ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws Exception { String keyId = "abc123"; MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); + +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { +refreshingHttpsJwks.init(); +// We refresh once at the initialization time from getJsonWebKeys. +verify(httpsJwks, times(1)).refresh(); +assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); +verify(httpsJwks, times(2)).refresh(); +time.sleep(REFRESH_MS + 1); Review Comment: Why were we using real time here at all So now we refresh once more due to mock time passing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]
ableegoldman commented on code in PR #14853: URL: https://github.com/apache/kafka/pull/14853#discussion_r1412620201 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -572,13 +574,19 @@ public synchronized void flush() { @Override public void addToBatch(final KeyValue record, - final WriteBatch batch) throws RocksDBException { + final WriteBatchInterface batch) throws RocksDBException { dbAccessor.addToBatch(record.key, record.value, batch); } @Override -public void write(final WriteBatch batch) throws RocksDBException { -db.write(wOptions, batch); +public void write(final WriteBatchInterface batch) throws RocksDBException { +if (batch instanceof WriteBatch) { +db.write(wOptions, (WriteBatch) batch); +} else if (batch instanceof WriteBatchWithIndex) { +db.write(wOptions, (WriteBatchWithIndex) batch); +} else { +throw new ProcessorStateException("Unknown type of batch " + batch.getClass().getCanonicalName()); Review Comment: nit: log an error before we throw the exception Also, imo this should just be an IllegalStateException instead of a ProcessorStateException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
jolshan commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1412620108 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends OAuthBearerTest { @Test public void testBasicScheduleRefresh() throws Exception { String keyId = "abc123"; -Time time = new MockTime(); +MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); -try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) { +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { refreshingHttpsJwks.init(); verify(httpsJwks, times(1)).refresh(); assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); -verify(httpsJwks, times(1)).refresh(); +verify(httpsJwks, times(2)).refresh(); Review Comment: Oof what an unfortunate bug. Thanks for fixing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield closed pull request #14894: KAFKA-15831: KIP-1000 protocol and admin client URL: https://github.com/apache/kafka/pull/14894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]
mumrah commented on PR #14850: URL: https://github.com/apache/kafka/pull/14850#issuecomment-1836803801 I tried this in IntelliJ and I see the test name included in the individual test variation output. Looks great! https://github.com/apache/kafka/assets/55116/fac807f0-06f8-4237-be95-3f9777c73363;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987 [2/2]; customize retry backoff for group/offsets expiration [kafka]
jeffkbkim commented on code in PR #14870: URL: https://github.com/apache/kafka/pull/14870#discussion_r1412583010 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java: ## @@ -52,6 +52,20 @@ interface TimeoutOperation { */ void schedule(String key, long delay, TimeUnit unit, boolean retry, TimeoutOperation operation); +/** + * Add an operation to the timer. If an operation with the same key + * already exists, replace it with the new operation. + * + * @param key The key to identify this operation. + * @param delay The delay to wait before expiring. + * @param unit The delay unit. + * @param retry A boolean indicating whether the operation should + * be retried on failure. + * @param retryBackoff The delay when rescheduled on retry. + * @param operation The operation to perform upon expiration. + */ +void schedule(String key, long delay, TimeUnit unit, boolean retry, long retryBackoff, TimeoutOperation operation); Review Comment: yes. i updated the javadocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jeffkbkim commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1412575325 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: some general questions for my understanding: 1. We are introducing a buffer supplier so that we can reuse the same buffer and not reallocate a new buffer each time. 2. It's a growable buffer supplier since maxBatchSize may change on subsequent append() 3. we're adding a try-finally block to always clear the buffer in the case of exceptions. this was not needed previously since we were not using a buffer supplier 4. it's a thread local supplier because we don't want the buffer to be reused across different threads - i don't think i fully understand the repercussions is 1-3 correct? 4 is the actual question. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412552909 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { Review Comment: I frankly cannot remember all the details, but this was the PR introducing `ManagedKeyValueIterotor`: https://github.com/apache/kafka/pull/13142 I understand that we don't use `KeyValueIterator`, but we still work with segments, and thus I am wondering if we would need a "managed" version of our iterator, too, re-implementing this call-back mechanism? (I am really not sure right now if we need it or not...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1836749674 Hi @apoorvmittal10 Thanks for the time to address my comments - I made another pass and added some more comments, mostly nit-picking. After this I think we are good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412554632 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { Review Comment: should we test the case where the subscription changes when creating a push request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412552909 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { Review Comment: I frankly cannot remember all the details, but this was the PR introducing `ManagedKeyValueIterotor`: https://github.com/apache/kafka/pull/13142 I understand that we don't use `KeyValueIterator`, but we still work with segments, and thus I am wondering if we would need a "managed" version of our iterator, too, re-implementing this call-back mechanism? (I am really not sure right now if we need it or not...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412506519 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412542727 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } -public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - -Objects.requireNonNull(key, "key cannot be null"); +@SuppressWarnings("unchecked") Review Comment: Well, `VersionedRecordIterator` takes generic `V` abutd `LogicalSegmentIterator ` is not specifying the type (what is a bug): ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` Guess it should be (what will avoid the warning making the suppression unnecessary): ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` What implies that `public Object next() ` should actually return `byte[]`, right (something I missed in my review). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } -public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - -Objects.requireNonNull(key, "key cannot be null"); +@SuppressWarnings("unchecked") Review Comment: Well, `VersionedRecordIterator` takes generic `V` abutd `LogicalSegmentIterator ` is not specifying the type (what is a bug): ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` Guess it should be, what will avoid the warning making the suppression. unnecessary? ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` What implies that `public Object next() ` should actually return `byte[]`, right (something I missed in my review). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } -public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - -Objects.requireNonNull(key, "key cannot be null"); +@SuppressWarnings("unchecked") Review Comment: Well, `VersionedRecordIterator` takes generic `V` and `LogicalSegmentIterator ` is not specifying the type (what is a bug): ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` Guess it should be, what will avoid the warning making the suppression. unnecessary? ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` What implies that `public Object next() ` should actually return `byte[]`, right (something I missed in my review). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412530561 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412522622 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java: ## @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryReporterTest { + +private MockTime time; +private ClientTelemetryReporter clientTelemetryReporter; +private Map configs; +private MetricsContext metricsContext; +private Uuid uuid; +private ClientTelemetryReporter.ClientTelemetrySubscription subscription; + +@BeforeEach +public void setUp() { +time = new MockTime(); +clientTelemetryReporter = new ClientTelemetryReporter(time); +configs = new HashMap<>(); +metricsContext = new KafkaMetricsContext("test"); +uuid = Uuid.randomUuid(); +subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2, +Collections.emptyList(), true, null); +} + +@Test +public void testInitTelemetryReporter() { +configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client"); +configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack"); + +clientTelemetryReporter.configure(configs); +clientTelemetryReporter.contextChange(metricsContext); +assertNotNull(clientTelemetryReporter.metricsCollector()); +assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); +assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); +assertEquals( +ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); +assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); +} + +@Test +public void testInitTelemetryReporterNoCollector() { +// Remove namespace config which skips the collector initialization. +MetricsContext metricsContext = Collections::emptyMap; + +
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412522319 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] MINOR: cleanup internal Iterator impl [kafka]
mjsax merged PR #14889: URL: https://github.com/apache/kafka/pull/14889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412507867 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412506519 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412500117 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information i.e. push interval, temporality, + * compression type, etc. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * + * + * The state transition follows the following steps in order: + * + * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} + * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS} + * {@link ClientTelemetryState#PUSH_NEEDED} + * {@link ClientTelemetryState#PUSH_IN_PROGRESS} + * {@link
Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]
lianetm commented on code in PR #14878: URL: https://github.com/apache/kafka/pull/14878#discussion_r1412491852 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -783,6 +774,7 @@ void markReconciliationCompleted() { * */ private void resolveMetadataForUnresolvedAssignment() { +assignmentReadyToReconcile.clear(); Review Comment: This call here could make us loose assignments in some cases I expect (we can have assignments ready to reconcile that are not being reconciled yet, and then get more as a result of a metadata update. All need to be kept in the buffer). Assignments ready to reconcile are discovered from 2 places (when we get a HB response, but also when we get Metadata updates), and kept in the buffer if there is a previous reconciliation going on. Could you point me to the integration test that you mentioned brought this situation up? I'll take a look to better understand what could be missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412491930 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String GROUP_MEMBER_ID = "group_member_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; + +private static final String PRODUCER_NAMESPACE = "kafka.producer"; +private static final String CONSUMER_NAMESPACE = "kafka.consumer"; + +private static final Map PRODUCER_CONFIG_MAPPING = new HashMap<>(); +private static final Map CONSUMER_CONFIG_MAPPING = new HashMap<>(); + +private volatile Resource resource = null; +private Map config = null; + +// Mapping of config keys to telemetry keys. Contains only keys which can be fetched from config. +// Config like group_member_id is not present here as it is not fetched from config. +static { +PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); +} + +@Override +public synchronized void configure(Map configs) { +this.config = configs; +} + +/** + * Validate that all the data required for generating correct metrics is present. + * + * @param metricsContext {@link MetricsContext} + * @return false if all the data required for generating correct metrics is missing, true + * otherwise. + */ +boolean validate(MetricsContext metricsContext) { +return ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels()); +} + +/** + * Sets the metrics tags for the service or library exposing metrics. This will be called before + * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and may be called anytime + * after that. + * + * @param metricsContext {@link MetricsContext} + */ +synchronized void contextChange(MetricsContext metricsContext) { +final Resource.Builder resourceBuilder = Resource.newBuilder(); + +final String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE); +if (PRODUCER_NAMESPACE.equals(namespace)) { +// Add producer resource labels. +PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { +if (config.containsKey(configKey)) { Review Comment: do we need to log debug if the configKey does not exist? This might be useful to debug purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412484071 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. Review Comment: sounds good - what i meant is since you mentioned the state transition, I wonder if we should just document the transition in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please
Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
mjsax commented on PR #14895: URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836641789 `:generator:checkstyleTest` failure -- could not reproduce locally. Will retrigger Jenkins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
philipnee commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1412482284 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. Review Comment: great thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1412455807 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -613,7 +613,16 @@ class Partition(val topicPartition: TopicPartition, // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { -// lock to prevent the log append by followers while checking if the log dir could be replaced with future log. +maybeFutureReplicaCaughtUp((futurePartitionLog: UnifiedLog) => { + logManager.replaceCurrentWithFutureLog(topicPartition) + futurePartitionLog.setLogOffsetsListener(logOffsetsListener) + log = futureLog + removeFutureLocalReplica(false) +}) + } + + def maybeFutureReplicaCaughtUp(callback: UnifiedLog => Unit): Boolean = { Review Comment: rename it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
cadonna commented on PR #14895: URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836560772 FYI, the following tests also fails consistently on trunk: ``` org.apache.kafka.common.requests.BrokerRegistrationRequestTest.[1] 0 org.apache.kafka.common.requests.BrokerRegistrationRequestTest.[2] 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1412428498 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -181,11 +183,13 @@ private void process(final ListOffsetsApplicationEvent event) { * it is already a member. */ private void processSubscriptionChangeEvent() { -if (!requestManagers.membershipManager.isPresent()) { -throw new RuntimeException("Group membership manager not present when processing a " + -"subscribe event"); +if (!requestManagers.heartbeatRequestManager.isPresent()) { +KafkaException error = new KafkaException("Group membership manager not present when processing a subscribe event"); Review Comment: True, this case _shouldn't_ occur, but if it does, throwing an exception kills the background thread dead, which usually causes weird hanging. We specifically tend to avoid throwing exceptions from the event processor (or anything it calls) for this reason. Perhaps that's too strict of a policy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
lucasbru commented on PR #14895: URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836550893 @mjsax could you merge this later today if CI passes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1412423207 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -391,4 +402,4 @@ static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long broke .setBrokerEpoch(brokerEpoch) .setDirectories(new ArrayList<>(directoryMap.values())); } -} +} Review Comment: revert it ## server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java: ## @@ -248,4 +248,4 @@ void testRequeuesFailedAssignmentPropagations() throws InterruptedException { }} ), captor.getAllValues().get(4).build().data()); } -} +} Review Comment: revert it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1412422869 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -95,7 +96,10 @@ public void close() throws InterruptedException { } public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { Review Comment: I changed this to have one `onAssignment` and `AssignmentEvent` constructor that will replace callback null with runnable that does nothing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 opened a new pull request, #14898: URL: https://github.com/apache/kafka/pull/14898 Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15690: Fix restoring tasks on partition loss, flaky EosIntegrationTest [kafka]
lucasbru merged PR #14869: URL: https://github.com/apache/kafka/pull/14869 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1836538541 Build is yellow with unrelated tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
[ https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792158#comment-17792158 ] Apoorv Mittal commented on KAFKA-9545: -- Reoccurred the failure: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14724/19/tests {code:java} Errororg.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Stream tasks not updated ==> expected: but was: Stacktraceorg.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Stream tasks not updated ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:285) at app//org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:295) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) {code} > Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` > -- > > Key: KAFKA-9545 > URL: https://issues.apache.org/jira/browse/KAFKA-9545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ > {code} > java.lang.AssertionError: Condition not met within timeout 15000. Stream > tasks not updated > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Reopened] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
[ https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal reopened KAFKA-9545: -- Assignee: (was: Boyang Chen) > Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` > -- > > Key: KAFKA-9545 > URL: https://issues.apache.org/jira/browse/KAFKA-9545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jason Gustafson >Priority: Major > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ > {code} > java.lang.AssertionError: Condition not met within timeout 15000. Stream > tasks not updated > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix flaky `DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations` [kafka]
hachikuji merged PR #14890: URL: https://github.com/apache/kafka/pull/14890 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
lianetm commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1412391352 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -198,11 +202,13 @@ private void processSubscriptionChangeEvent() { * the group is sent out. */ private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) { -if (!requestManagers.membershipManager.isPresent()) { -throw new RuntimeException("Group membership manager not present when processing an " + -"unsubscribe event"); +if (!requestManagers.heartbeatRequestManager.isPresent()) { Review Comment: Same as above comment? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -198,11 +202,13 @@ private void processSubscriptionChangeEvent() { * the group is sent out. */ private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) { -if (!requestManagers.membershipManager.isPresent()) { -throw new RuntimeException("Group membership manager not present when processing an " + -"unsubscribe event"); +if (!requestManagers.heartbeatRequestManager.isPresent()) { Review Comment: Same as the comment above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
lianetm commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1412390835 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -181,11 +183,13 @@ private void process(final ListOffsetsApplicationEvent event) { * it is already a member. */ private void processSubscriptionChangeEvent() { -if (!requestManagers.membershipManager.isPresent()) { -throw new RuntimeException("Group membership manager not present when processing a " + -"subscribe event"); +if (!requestManagers.heartbeatRequestManager.isPresent()) { +KafkaException error = new KafkaException("Group membership manager not present when processing a subscribe event"); Review Comment: I expect this would only be the case of a bug in our code, so a `RuntimeException` that would fail the background thread (and therefore the consumer) seemed simple and enough. Is there a reason why we need all the logic to pass this failure to the app thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
hanyuzheng7 commented on PR #14895: URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836500935 Thanks @lucasbru ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15953: Refactor polling delays [kafka]
lucasbru commented on code in PR #14897: URL: https://github.com/apache/kafka/pull/14897#discussion_r1412360054 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -51,7 +51,7 @@ */ public class ConsumerNetworkThread extends KafkaThread implements Closeable { -private static final long MAX_POLL_TIMEOUT_MS = 5000; +static final long MAX_POLL_TIMEOUT_MS = 5000; Review Comment: nit: // visible for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412360413 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); +return groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: Actually, `backgroundEventProcessor.process();` sets the group metadata. So returning group metadata from `maybeThrowInvalidGroupIdException` might return an outdated group metadata. Since now I removed `backgroundEventProcessor.process();`. I will reconsider the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: disable test_transactions with new group coordinator [kafka]
jolshan merged PR #14884: URL: https://github.com/apache/kafka/pull/14884 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: disable test_transactions with new group coordinator [kafka]
jolshan commented on PR #14884: URL: https://github.com/apache/kafka/pull/14884#issuecomment-1836445897 This is a python change so I will merge. These OutOfMemory issues are not good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on PR #14879: URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836371437 > > Can you elaborate on the direction to remove the background queue from the 'test builder' instead of using the one it constructed? > > I had issues with tests using the spy on the `AsyncKafkaConsumer`. More precisely, a test failed with the spy but did not fail with a consumer not wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really carefully. Actually, good code does not need spies (with a few exceptions). Spies avoid to structure the code well. They do not force one to loosely couple objects. Even the [Mockito documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy) warns against their own spies. Additionally, the code under test, i.e., the async Kafka consumer, should not be wrapped into a spy. We should test that code directly to avoid possible side effects coming from the wrapping or from mistakes in specifying stubs on the spy. I've had exactly the same problem. Nested mocking makes it all very unhappy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org