Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 closed pull request #14724: KAFKA-15663, KAFKA-15794: Telemetry 
reporter and request handling (KIP-714)
URL: https://github.com/apache/kafka/pull/14724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1837060946

   > Hi @apoorvmittal10 Thanks for the time to address my comments - I made 
another pass and added some more comments, mostly nit-picking. After this I 
think we are good to go.
   
   Hi @philipnee thanks for the re-review. I have addressed the feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412734130


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412733949


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {

Review Comment:
   I am not sure if I got the comment correctly. If the comment is for adding 
tests case when push telemetry receives subscription error them that's covered 
under `testHandleResponsePushTelemetryErrorResponse`. However I have added 
another test case now which verifies the change in subscription (also change in 
temporality) when handling get subscriptions as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]

2023-12-01 Thread via GitHub


splett2 commented on PR #14891:
URL: https://github.com/apache/kafka/pull/14891#issuecomment-1837054475

   I don't think we want to directly call `TestUtils.createTopicWithAdmin` in 
this test for most cases. I didn't read through all of the tests, but I would 
imagine that most of these integration tests are trying to exercise the 
`topicService` logic.
   
   I would imagine it's probably sufficient to add a call to 
`ensureConsistentKRaftMetadata()` in `waitForTopicCreated` to deflake most 
failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729920


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


splett2 commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   this test was not changed. I just inserted the new `mockExecutorService` 
method in a way that generated a weird diff. note that the assertions here are 
the same as the ones that were "removed" from lines 159-168.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15958) Add tests to validate telemetry requests with different version

2023-12-01 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15958:
-

 Summary: Add tests to validate telemetry requests with different 
version
 Key: KAFKA-15958
 URL: https://issues.apache.org/jira/browse/KAFKA-15958
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Details: https://github.com/apache/kafka/pull/14724#discussion_r1412530561



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729783


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729589


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


splett2 commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   this code was not changed. I just inserted the new `mockExecutorService` 
method in a way that generated a weird diff. note that the assertions here are 
the same as the ones that were "removed" from lines 159-168.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412729556


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


splett2 commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1412729044


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   this code was not changed. I just inserted the new `mockExecutorService` 
method in a way that generated a weird diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412723727


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412723432


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412722252


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String GROUP_MEMBER_ID = "group_member_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";
+
+private static final String PRODUCER_NAMESPACE = "kafka.producer";
+private static final String CONSUMER_NAMESPACE = "kafka.consumer";
+
+private static final Map PRODUCER_CONFIG_MAPPING = new 
HashMap<>();
+private static final Map CONSUMER_CONFIG_MAPPING = new 
HashMap<>();
+
+private volatile Resource resource = null;
+private Map config = null;
+
+// Mapping of config keys to telemetry keys. Contains only keys which can 
be fetched from config.
+// Config like group_member_id is not present here as it is not fetched 
from config.
+static {
+PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
ClientTelemetryProvider.TRANSACTIONAL_ID);
+
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, 
ClientTelemetryProvider.GROUP_ID);
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
ClientTelemetryProvider.GROUP_INSTANCE_ID);
+}
+
+@Override
+public synchronized void configure(Map configs) {
+this.config = configs;
+}
+
+/**
+ * Validate that all the data required for generating correct metrics is 
present.
+ *
+ * @param metricsContext {@link MetricsContext}
+ * @return false if all the data required for generating correct metrics 
is missing, true
+ * otherwise.
+ */
+boolean validate(MetricsContext metricsContext) {
+return 
ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels());
+}
+
+/**
+ * Sets the metrics tags for the service or library exposing metrics. This 
will be called before
+ * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and 
may be called anytime
+ * after that.
+ *
+ * @param metricsContext {@link MetricsContext}
+ */
+synchronized void contextChange(MetricsContext metricsContext) {
+final Resource.Builder resourceBuilder = Resource.newBuilder();
+
+final String namespace = 
metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+if (PRODUCER_NAMESPACE.equals(namespace)) {
+// Add producer resource labels.
+PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
+if (config.containsKey(configKey)) {

Review Comment:
   I think not required as configs are optional, I never needed this log while 
testing and debugging the complete feature. I ll skip this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412721690


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.

Review Comment:
   Yeah, the code has a write up in ClientTelemetryState.java where next states 
are mentioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2023-12-01 Thread via GitHub


vamossagar12 commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-1837036580

   @ableegoldman , just checking if you got a chance to look at my comment 
above? Don't mean to be pushy on this but since 3.7 release is approaching, I 
thought I would want to have the PR ready so that it gives you sufficient time 
to review as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

2023-12-01 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15932:
-

Assignee: Andrew Schofield

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> ---
>
> Key: KAFKA-15932
> URL: https://issues.apache.org/jira/browse/KAFKA-15932
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: flaky-test
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>   at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> 

Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]

2023-12-01 Thread via GitHub


kirktrue closed pull request #14768: KAFKA-14438: Throw error when consumer 
configured with empty/whitespace-only group.id for AsyncKafkaConsumer
URL: https://github.com/apache/kafka/pull/14768


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-01 Thread via GitHub


wcarlson5 commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1412698954


##
docs/streams/upgrade-guide.html:
##
@@ -198,6 +184,21 @@ Streams API
 
 
 
+

Review Comment:
   Thanks for remembering the docs update!



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -115,102 +116,93 @@ public void process(final Record> record) {
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+} else if (record.value().newValue != null) {
+final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+}
+}
+
+private void defaultJoinInstructions(final Record> 
record) {
+if (record.value().oldValue != null) {
+final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
 if (oldForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
 if (record.value().newValue != null) {
-final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
 if (newForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
-
-final byte[] serialOldForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-final byte[] serialNewForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
 //Different Foreign Key - delete the old key value and 
propagate the new one.
 //Delete it from the oldKey's state store
-context().forward(
-record.withKey(oldForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-DELETE_KEY_NO_PROPAGATE,
-record.key(),
-partition
-)));
-//Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-//and LEFT join.
+forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
 }
-context().forward(
-record.withKey(newForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-record.key(),
-partition
-)));
+//Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+//and LEFT join.
+

Re: [PR] improve logging around broker/NetworkClient connectivity [kafka]

2023-12-01 Thread via GitHub


rayalatrinadh commented on PR #14901:
URL: https://github.com/apache/kafka/pull/14901#issuecomment-1836995567

   improve logging around broker/NetworkClient connectivity Committed Checked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] improve logging around broker/NetworkClient connectivity [kafka]

2023-12-01 Thread via GitHub


rayalatrinadh opened a new pull request, #14901:
URL: https://github.com/apache/kafka/pull/14901

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412701656


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() {
 
 verify(runtimeMetrics, 
times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
 }
+
+@Test
+public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(Time.SYSTEM)
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader(
+new CoordinatorLoader.LoadSummary(
+1000,
+2000,
+30,
+3000),
+Arrays.asList(5L, 15L, 27L),
+Arrays.asList(5L, 15L)))
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Getting the coordinator context fails because the coordinator
+// does not exist until scheduleLoadOperation is called.
+assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the coordinator context succeeds now.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+// When the loading completes, the coordinator transitions to active.
+assertEquals(ACTIVE, ctx.state);
+
+assertEquals(27L, ctx.stateMachine.lastWrittenOffset());
+assertEquals(15L, ctx.stateMachine.lastCommittedOffset());
+assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L));

Review Comment:
   i'm not sure what you mean. this is to confirm that previous snapshots were 
deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412700208


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -322,7 +335,7 @@ public void testScheduleLoading() {
 when(builder.build()).thenReturn(coordinator);
 when(supplier.get()).thenReturn(builder);
 CompletableFuture future = new 
CompletableFuture<>();
-when(loader.load(TP, coordinator)).thenReturn(future);
+when(loader.load(eq(TP), any())).thenReturn(future);

Review Comment:
   i'm not sure how to go about this. The reason I had it this way was because 
until we call `runtime.scheduleLoadOperation()`, we don't have a context for 
the topic partition. but when we schedule a load operation we run 
`loader.load()`. If there's a way, I'll make the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


wcarlson5 commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +735,97 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime

Review Comment:
   good comments :)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {
+boolean setDeadline = false;
+
+final Map> result = new HashMap<>();
+
+KafkaFutureImpl future = new KafkaFutureImpl<>();
+if (mainConsumerClientInstanceId != null) {
+future.complete(mainConsumerClientInstanceId);
+} else {
+mainConsumerInstanceIdFuture = future;
+setDeadline = true;
+}
+result.put(getName() + "-consumer", future);
+
+future = new KafkaFutureImpl<>();
+if (restoreConsumerClientInstanceId != null) {
+future.complete(restoreConsumerClientInstanceId);
+} else {
+restoreConsumerInstanceIdFuture = future;
+setDeadline = true;
+}
+result.put(getName() + "-restore-consumer", future);
+
+if (setDeadline) {
+fetchDeadline = time.milliseconds() + timeout.toMillis();
+}
+
+return result;
+}
+
+public KafkaFuture>> 
producersClientInstanceIds(final Duration timeout) {
+final KafkaFutureImpl>> result = new 
KafkaFutureImpl<>();
+
+if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+//for (final TaskId taskId : taskManager.activeTaskIds()) {
+//future = new KafkaFutureImpl<>();
+//if (taskProducersClientInstanceIds.get(taskId) != null) {
+//
future.complete(taskProducersClientInstanceIds.get(taskId));
+//} else {
+//taskProducersInstanceIdsFuture.put(taskId, future);
+//setDeadline = true;
+//}
+//result.put(getName() + "-" + taskId + "-producer", future);
+//};
+} else {
+final KafkaFutureImpl producerFuture = new 
KafkaFutureImpl<>();
+if (threadProducerClientInstanceId != null) {
+producerFuture.complete(threadProducerClientInstanceId);
+} else {
+threadProducerInstanceIdFuture = producerFuture;
+if (fetchDeadline == -1) {
+fetchDeadline = time.milliseconds() + timeout.toMillis();
+}
+}
+
+result.complete(Collections.singletonMap(getName() + "-producer", 
producerFuture));

Review Comment:
   Could we be lazy and just treat any unavailable tasks as if they were set up 
with telemetry is disabled on the client itself?
   
   Not ideal but should keep things simple. Otherwise we might timeout all the 
time with restoring tasks



##
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+private final Map consumerInstanceIds = new HashMap<>();
+private final Map producerInstanceIds = new HashMap<>();
+private Uuid adminInstanceId;
+
+public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+consumerInstanceIds.put(key, instanceId);
+}
+
+public void 

Re: [PR] errorHandling [kafka]

2023-12-01 Thread via GitHub


rayalatrinadh commented on PR #14900:
URL: https://github.com/apache/kafka/pull/14900#issuecomment-1836981169

   no conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] errorHandling [kafka]

2023-12-01 Thread via GitHub


rayalatrinadh opened a new pull request, #14900:
URL: https://github.com/apache/kafka/pull/14900

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] errorHandling [kafka]

2023-12-01 Thread via GitHub


rayalatrinadh closed pull request #14900: errorHandling
URL: https://github.com/apache/kafka/pull/14900


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


cmccabe commented on PR #14899:
URL: https://github.com/apache/kafka/pull/14899#issuecomment-1836970040

   In addition to the junit tests, I also tested this manually and it works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


cmccabe commented on code in PR #14899:
URL: https://github.com/apache/kafka/pull/14899#discussion_r1412689568


##
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##
@@ -119,6 +141,7 @@ private void initializeWithRaftManager() {
 }
 
 private void initializeWithSnapshotFileReader() throws Exception {
+this.fileLock = takeFileLock(new File(snapshotPath).getParentFile());

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683183


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   I think we should leave it only in `poll()` for the time being, yes  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   I think we should leave it only in `poll()` for the time being, yes  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683354


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Oh, the line after `throwInInvalidGroupIdException` is just to appease the 
compiler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15957:
---

Assignee: Lucas Brutschy

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1412678216


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   > Why were we using real time here at all 
   
   This test has been using `MockTime` since the start, though 



##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   > Why were we using real time here at all 
   
   This test has been using `MockTime` since the start, though 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


cmccabe commented on code in PR #14899:
URL: https://github.com/apache/kafka/pull/14899#discussion_r1412672706


##
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##
@@ -81,6 +85,21 @@ public MetadataShell build() {
 }
 }
 
+static FileLock takeFileLock(File directory) {
+FileLock fileLock = new FileLock(new File(directory, ".lock"));

Review Comment:
   Yeah, that might make sense for this use-case. Since we also want to be able 
to look at snapshots that aren't inside log dirs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


cmccabe commented on code in PR #14899:
URL: https://github.com/apache/kafka/pull/14899#discussion_r1412672706


##
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##
@@ -81,6 +85,21 @@ public MetadataShell build() {
 }
 }
 
+static FileLock takeFileLock(File directory) {
+FileLock fileLock = new FileLock(new File(directory, ".lock"));

Review Comment:
   That seems like a bad idea since it could allow another server or tool to 
collide with what we're doing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


jolshan commented on PR #14895:
URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836934042

   fyi -- I filed https://issues.apache.org/jira/browse/KAFKA-15957 but looks 
like you folks are on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-01 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792268#comment-17792268
 ] 

Justine Olshan commented on KAFKA-15957:


[https://github.com/apache/kafka/pull/14895] is fixing this

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412648726


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.
+ * @param startingOffset   the offset from which the Standby Task starts 
watching.
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset);
+
+/**
+ * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever

Review Comment:
   ```suggestion
* Method called after loading a batch of records. In this case the 
maximum size of the batch is whatever
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.

Review Comment:
   ditto here, copy this to the other two callbacks as well
   ```suggestion
* @param storeName the name of the store being loaded
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 

[jira] [Created] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-01 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15957:
--

 Summary: 
ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
broken
 Key: KAFKA-15957
 URL: https://issues.apache.org/jira/browse/KAFKA-15957
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


jsancio commented on code in PR #14899:
URL: https://github.com/apache/kafka/pull/14899#discussion_r1412657561


##
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##
@@ -81,6 +85,21 @@ public MetadataShell build() {
 }
 }
 
+static FileLock takeFileLock(File directory) {
+FileLock fileLock = new FileLock(new File(directory, ".lock"));

Review Comment:
   I suggest not creating a `.lock` file if the specified directory doesn't 
contain a `.lock` file.



##
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##
@@ -119,6 +141,7 @@ private void initializeWithRaftManager() {
 }
 
 private void initializeWithSnapshotFileReader() throws Exception {
+this.fileLock = takeFileLock(new File(snapshotPath).getParentFile());

Review Comment:
   This doesn't look like the correct directory. Please take a look at the 
`LogManager` for details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-01 Thread via GitHub


cmccabe opened a new pull request, #14899:
URL: https://github.com/apache/kafka/pull/14899

   MetadataShell should take an advisory lock on the .lock file of the 
directory it is reading from. Add an integration test of this functionality in 
MetadataShellIntegrationTest.java.
   
   Note: in build.gradle, I had to add some dependencies on server-common's 
test files in order to use MockFaultHandler, etc.
   
   MetadataBatchLoader.java: fix a case where a log message was incorrect.  The 
intention was to print the number equivalent to (offset + index).  Instead it 
was printing the offset, followed by the index. So if the offset was 100 and 
the index was 1, 1001 would be printed rather than 101.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412638885


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
 }
 }
 
+/**
+ * Set the listener which is triggered whenever a standby task is updated
+ *
+ * @param standbyListener The listener triggered when a standby task is 
updated.
+ * @throws IllegalStateException if this {@code KafkaStreams} instance has 
already been started.
+ */
+public void setStandbyUpdateListener(final StandbyUpdateListener 
standbyListener) {

Review Comment:
   Agreed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: 1st part : Java versions for metadata/broker and updated LogConfig [kafka]

2023-12-01 Thread via GitHub


fvaleri commented on PR #14106:
URL: https://github.com/apache/kafka/pull/14106#issuecomment-1836870533

   @muralibasani should you also close this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15956) MetadataShell must take the directory lock when reading

2023-12-01 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15956:


 Summary: MetadataShell must take the directory lock when reading
 Key: KAFKA-15956
 URL: https://issues.apache.org/jira/browse/KAFKA-15956
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


MetadataShell must take the directory lock when reading files, to avoid 
unpleasant surprises from concurrent reads and writes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412627530


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine, U> implements 
CoordinatorPlayback {
+/**
+ * The logger.
+ */
+private final Logger log;
+/**
+ * The actual state machine.
+ */
+private S coordinator;
+
+/**
+ * The snapshot registry backing the coordinator.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The topic partition.
+ */
+private final TopicPartition tp;
+
+/**
+ * The last offset written to the partition.
+ */
+private long lastWrittenOffset;
+
+/**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+private long lastCommittedOffset;
+
+ContextStateMachine(
+LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+S coordinator,
+TopicPartition tp
+) {
+this.log = logContext.logger(ContextStateMachine.class);
+this.coordinator = coordinator;
+this.snapshotRegistry = snapshotRegistry;
+this.tp = tp;
+this.lastWrittenOffset = 0;
+this.lastCommittedOffset = 0;
+snapshotRegistry.getOrCreateSnapshot(0);
+snapshotRegistry.deleteSnapshotsUpTo(0);
+}
+
+/**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+synchronized void revertLastWrittenOffset(
+long offset
+) {
+if (offset > lastWrittenOffset) {
+throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+" must be smaller than " + lastWrittenOffset + ".");
+}
+
+log.debug("Revert last written offset of {} to {}.", tp, offset);
+lastWrittenOffset = offset;
+snapshotRegistry.revertToSnapshot(offset);
+}
+
+@Override
+public synchronized void replay(
+long producerId,
+short producerEpoch,
+U record
+) {
+coordinator.replay(producerId, producerEpoch, record);
+}
+
+/**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+@Override
+public synchronized void updateLastWrittenOffset(Long offset) {
+if (offset <= lastWrittenOffset) {
+throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
+" must be greater than " + lastWrittenOffset + ".");
+}
+
+log.debug("Update last written offset of {} to {}.", tp, offset);
+lastWrittenOffset = offset;
+snapshotRegistry.getOrCreateSnapshot(offset);
+}
+
+/**
+ * Updates the last committed offset. This completes all the deferred
+ * events waiting on this offset. This also cleanups all the snapshots
+ * prior to this offset.
+ *
+ * @param offset The new last committed offset.
+ */
+@Override
+public synchronized void updateLastCommittedOffset(Long offset) {
+if (offset < lastCommittedOffset) {
+throw new IllegalStateException("New committed offset " + offset + 
" of " + tp +
+" must be greater than or equal to " + lastCommittedOffset + 
".");
+}
+
+lastCommittedOffset = offset;
+

Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]

2023-12-01 Thread via GitHub


jolshan commented on PR #14896:
URL: https://github.com/apache/kafka/pull/14896#issuecomment-1836863361

   I've also been seeing more failed builds --
   
   ```
   * What went wrong:
   
   Execution failed for task ':streams:upgrade-system-tests-25:test'.
   
   > Process 'Gradle Test Executor 9' finished with non-zero exit value 1
   
 This problem might be caused by incorrect test process configuration.
 ```
 
 From this PR -- is it possible the new consumer could be throwing errors 
that cause test issues like this? I know it should be on the test to handle 
errors in a way that doesn't kill the build, but I have been seeing many failed 
builds due to these issues or GC/OOM issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1412625873


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -64,31 +62,14 @@ public RocksDBTimestampedStore(final String name,
 @Override
 void openRocksDB(final DBOptions dbOptions,
  final ColumnFamilyOptions columnFamilyOptions) {
-final List columnFamilyDescriptors = asList(
-new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
-final List columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
-
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} catch (final RocksDBException e) {
-if ("Column family not found: 
keyValueWithTimestamp".equals(e.getMessage())) {
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors.subList(0, 1), columnFamilies);
-
columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1)));
-} catch (final RocksDBException fatal) {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), fatal);
-}
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} else {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), e);
-}
-}
-}
+final List columnFamilies = openRocksDB(
+dbOptions,
+new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions)

Review Comment:
   We should define a constant for `keyValueWithTimestamp` instead of 
hard-coding the column family name like this. Especially since we'll be adding 
additional column families. We should have them all defined in one central place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]

2023-12-01 Thread via GitHub


jolshan commented on PR #14896:
URL: https://github.com/apache/kafka/pull/14896#issuecomment-1836861158

   Hmmm -- are we concerned with introducing these new test failures? I agree a 
timeout is good, but I'm wondering if there is a way we could suppress these 
failures for PR builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622438


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
   }
 
   currentOffset = batch.nextOffset
+  val currentHighWatermark = log.highWatermark
+  if (currentOffset >= currentHighWatermark) {
+coordinator.updateLastWrittenOffset(currentOffset)
+  }
+
+  if (currentHighWatermark > previousHighWatermark) {
+coordinator.updateLastCommittedOffset(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   added a comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622265


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine, U> implements 
CoordinatorPlayback {

Review Comment:
   i didn't like it either :) thanks for the suggestion, sounds much nicer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


jolshan commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r141264


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   Why were we using real time here at all  
   So now we refresh once more due to mock time passing?



##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -134,6 +136,20 @@ public void testSecondaryRefreshAfterElapsedDelay() throws 
Exception {
 String keyId = "abc123";
 MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
+
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
+refreshingHttpsJwks.init();
+// We refresh once at the initialization time from getJsonWebKeys.
+verify(httpsJwks, times(1)).refresh();
+assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+verify(httpsJwks, times(2)).refresh();
+time.sleep(REFRESH_MS + 1);

Review Comment:
   Why were we using real time here at all  
   So now we refresh once more due to mock time passing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14853:
URL: https://github.com/apache/kafka/pull/14853#discussion_r1412620201


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -572,13 +574,19 @@ public synchronized void flush() {
 
 @Override
 public void addToBatch(final KeyValue record,
-   final WriteBatch batch) throws RocksDBException {
+   final WriteBatchInterface batch) throws 
RocksDBException {
 dbAccessor.addToBatch(record.key, record.value, batch);
 }
 
 @Override
-public void write(final WriteBatch batch) throws RocksDBException {
-db.write(wOptions, batch);
+public void write(final WriteBatchInterface batch) throws RocksDBException 
{
+if (batch instanceof WriteBatch) {
+db.write(wOptions, (WriteBatch) batch);
+} else if (batch instanceof WriteBatchWithIndex) {
+db.write(wOptions, (WriteBatchWithIndex) batch);
+} else {
+throw new ProcessorStateException("Unknown type of batch " + 
batch.getClass().getCanonicalName());

Review Comment:
   nit: log an error before we throw the exception
   
   Also, imo this should just be an IllegalStateException instead of a 
ProcessorStateException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


jolshan commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1412620108


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends 
OAuthBearerTest {
 @Test
 public void testBasicScheduleRefresh() throws Exception {
 String keyId = "abc123";
-Time time = new MockTime();
+MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
 
-try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks)) {
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
 refreshingHttpsJwks.init();
 verify(httpsJwks, times(1)).refresh();
 assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
-verify(httpsJwks, times(1)).refresh();
+verify(httpsJwks, times(2)).refresh();

Review Comment:
   Oof what an unfortunate bug. Thanks for fixing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield closed pull request #14894: KAFKA-15831: KIP-1000 protocol and 
admin client
URL: https://github.com/apache/kafka/pull/14894


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]

2023-12-01 Thread via GitHub


mumrah commented on PR #14850:
URL: https://github.com/apache/kafka/pull/14850#issuecomment-1836803801

   I tried this in IntelliJ and I see the test name included in the individual 
test variation output. Looks great!
   
   https://github.com/apache/kafka/assets/55116/fac807f0-06f8-4237-be95-3f9777c73363;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987 [2/2]; customize retry backoff for group/offsets expiration [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14870:
URL: https://github.com/apache/kafka/pull/14870#discussion_r1412583010


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java:
##
@@ -52,6 +52,20 @@ interface TimeoutOperation {
  */
 void schedule(String key, long delay, TimeUnit unit, boolean retry, 
TimeoutOperation operation);
 
+/**
+ * Add an operation to the timer. If an operation with the same key
+ * already exists, replace it with the new operation.
+ *
+ * @param key   The key to identify this operation.
+ * @param delay The delay to wait before expiring.
+ * @param unit  The delay unit.
+ * @param retry A boolean indicating whether the operation should
+ *  be retried on failure.
+ * @param retryBackoff  The delay when rescheduled on retry.
+ * @param operation The operation to perform upon expiration.
+ */
+void schedule(String key, long delay, TimeUnit unit, boolean retry, long 
retryBackoff, TimeoutOperation operation);

Review Comment:
   yes. i updated the javadocs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1412575325


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   some general questions for my understanding:
   
   1. We are introducing a buffer supplier so that we can reuse the same buffer 
and not reallocate a new buffer each time.
   2. It's a growable buffer supplier since maxBatchSize may change on 
subsequent append()
   3. we're adding a try-finally block to always clear the buffer in the case 
of exceptions. this was not needed previously since we were not using a buffer 
supplier
   4. it's a thread local supplier because we don't want the buffer to be 
reused across different threads - i don't think i fully understand the 
repercussions
   
   is 1-3 correct? 4 is the actual question. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412552909


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {

Review Comment:
   I frankly cannot remember all the details, but this was the PR introducing 
`ManagedKeyValueIterotor`: https://github.com/apache/kafka/pull/13142
   
   I understand that we don't use `KeyValueIterator`, but we still work with 
segments, and thus I am wondering if we would need a "managed" version of our 
iterator, too, re-implementing this call-back mechanism? (I am really not sure 
right now if we need it or not...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1836749674

   Hi @apoorvmittal10 Thanks for the time to address my comments - I made 
another pass and added some more comments, mostly nit-picking.  After this I 
think we are good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412554632


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {

Review Comment:
   should we test the case where the subscription changes when creating a push 
request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412552909


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {

Review Comment:
   I frankly cannot remember all the details, but this was the PR introducing 
`ManagedKeyValueIterotor`: https://github.com/apache/kafka/pull/13142
   
   I understand that we don't use `KeyValueIterator`, but we still work with 
segments, and thus I am wondering if we would need a "managed" version of our 
iterator, too, re-implementing this call-back mechanism? (I am really not sure 
right now if we need it or not...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412506519


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412542727


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
-public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-Objects.requireNonNull(key, "key cannot be null");
+@SuppressWarnings("unchecked")

Review Comment:
   Well, `VersionedRecordIterator` takes generic `V` abutd 
`LogicalSegmentIterator ` is not specifying the type (what is a bug):
   ```
   public class LogicalSegmentIterator implements VersionedRecordIterator {
   ```
   
   Guess it should be (what will avoid the warning making the suppression 
unnecessary):
   ```
   public class LogicalSegmentIterator implements 
VersionedRecordIterator {
   ```
   
   What implies that `public Object next() ` should actually return `byte[]`, 
right (something I missed in my review).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
-public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-Objects.requireNonNull(key, "key cannot be null");
+@SuppressWarnings("unchecked")

Review Comment:
   Well, `VersionedRecordIterator` takes generic `V` abutd 
`LogicalSegmentIterator ` is not specifying the type (what is a bug):
   ```
   public class LogicalSegmentIterator implements VersionedRecordIterator {
   ```
   
   Guess it should be, what will avoid the warning making the suppression. 
unnecessary?
   ```
   public class LogicalSegmentIterator implements 
VersionedRecordIterator {
   ```
   
   What implies that `public Object next() ` should actually return `byte[]`, 
right (something I missed in my review).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412530967


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
-public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-Objects.requireNonNull(key, "key cannot be null");
+@SuppressWarnings("unchecked")

Review Comment:
   Well, `VersionedRecordIterator` takes generic `V` and 
`LogicalSegmentIterator ` is not specifying the type (what is a bug):
   ```
   public class LogicalSegmentIterator implements VersionedRecordIterator {
   ```
   
   Guess it should be, what will avoid the warning making the suppression. 
unnecessary?
   ```
   public class LogicalSegmentIterator implements 
VersionedRecordIterator {
   ```
   
   What implies that `public Object next() ` should actually return `byte[]`, 
right (something I missed in my review).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412530561


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412522622


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+private MockTime time;
+private ClientTelemetryReporter clientTelemetryReporter;
+private Map configs;
+private MetricsContext metricsContext;
+private Uuid uuid;
+private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+@BeforeEach
+public void setUp() {
+time = new MockTime();
+clientTelemetryReporter = new ClientTelemetryReporter(time);
+configs = new HashMap<>();
+metricsContext = new KafkaMetricsContext("test");
+uuid = Uuid.randomUuid();
+subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 2,
+Collections.emptyList(), true, null);
+}
+
+@Test
+public void testInitTelemetryReporter() {
+configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+clientTelemetryReporter.configure(configs);
+clientTelemetryReporter.contextChange(metricsContext);
+assertNotNull(clientTelemetryReporter.metricsCollector());
+assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+assertEquals(
+ClientTelemetryProvider.CLIENT_RACK, 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+assertEquals("rack", 
clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+}
+
+@Test
+public void testInitTelemetryReporterNoCollector() {
+// Remove namespace config which skips the collector initialization.
+MetricsContext metricsContext = Collections::emptyMap;
+
+

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412522319


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] MINOR: cleanup internal Iterator impl [kafka]

2023-12-01 Thread via GitHub


mjsax merged PR #14889:
URL: https://github.com/apache/kafka/pull/14889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412507867


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412506519


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412500117


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information i.e. 
push interval, temporality,
+ * compression type, etc.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ * 
+ *
+ * The state transition follows the following steps in order:
+ * 
+ * {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * {@link ClientTelemetryState#PUSH_NEEDED}
+ * {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * {@link 

Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14878:
URL: https://github.com/apache/kafka/pull/14878#discussion_r1412491852


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -783,6 +774,7 @@ void markReconciliationCompleted() {
  * 
  */
 private void resolveMetadataForUnresolvedAssignment() {
+assignmentReadyToReconcile.clear();

Review Comment:
   This call here could make us loose assignments in some cases I expect (we 
can have assignments ready to reconcile that are not being reconciled yet, and 
then get more as a result of a metadata update. All need to be kept in the 
buffer). Assignments ready to reconcile are discovered from 2 places (when we 
get a HB response, but also when we get Metadata updates), and kept in the 
buffer if there is a previous reconciliation going on. 
   
   Could you point me to the integration test that you mentioned brought this 
situation up? I'll take a look to better understand what could be missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412491930


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String GROUP_MEMBER_ID = "group_member_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";
+
+private static final String PRODUCER_NAMESPACE = "kafka.producer";
+private static final String CONSUMER_NAMESPACE = "kafka.consumer";
+
+private static final Map PRODUCER_CONFIG_MAPPING = new 
HashMap<>();
+private static final Map CONSUMER_CONFIG_MAPPING = new 
HashMap<>();
+
+private volatile Resource resource = null;
+private Map config = null;
+
+// Mapping of config keys to telemetry keys. Contains only keys which can 
be fetched from config.
+// Config like group_member_id is not present here as it is not fetched 
from config.
+static {
+PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
ClientTelemetryProvider.TRANSACTIONAL_ID);
+
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, 
ClientTelemetryProvider.GROUP_ID);
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
ClientTelemetryProvider.GROUP_INSTANCE_ID);
+}
+
+@Override
+public synchronized void configure(Map configs) {
+this.config = configs;
+}
+
+/**
+ * Validate that all the data required for generating correct metrics is 
present.
+ *
+ * @param metricsContext {@link MetricsContext}
+ * @return false if all the data required for generating correct metrics 
is missing, true
+ * otherwise.
+ */
+boolean validate(MetricsContext metricsContext) {
+return 
ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels());
+}
+
+/**
+ * Sets the metrics tags for the service or library exposing metrics. This 
will be called before
+ * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and 
may be called anytime
+ * after that.
+ *
+ * @param metricsContext {@link MetricsContext}
+ */
+synchronized void contextChange(MetricsContext metricsContext) {
+final Resource.Builder resourceBuilder = Resource.newBuilder();
+
+final String namespace = 
metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+if (PRODUCER_NAMESPACE.equals(namespace)) {
+// Add producer resource labels.
+PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
+if (config.containsKey(configKey)) {

Review Comment:
   do we need to log debug if the configKey does not exist? This might be 
useful to debug purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412484071


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.

Review Comment:
   sounds good - what i meant is since you mentioned the state transition, I 
wonder if we should just document the transition in the code.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please 

Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


mjsax commented on PR #14895:
URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836641789

   `:generator:checkstyleTest` failure -- could not reproduce locally. Will 
retrigger Jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1412482284


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.

Review Comment:
   great thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-12-01 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1412455807


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -613,7 +613,16 @@ class Partition(val topicPartition: TopicPartition,
   // Only ReplicaAlterDirThread will call this method and 
ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
-// lock to prevent the log append by followers while checking if the log 
dir could be replaced with future log.
+maybeFutureReplicaCaughtUp((futurePartitionLog: UnifiedLog) => {
+  logManager.replaceCurrentWithFutureLog(topicPartition)
+  futurePartitionLog.setLogOffsetsListener(logOffsetsListener)
+  log = futureLog
+  removeFutureLocalReplica(false)
+})
+  }
+
+  def maybeFutureReplicaCaughtUp(callback: UnifiedLog => Unit): Boolean = {

Review Comment:
   rename it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


cadonna commented on PR #14895:
URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836560772

   FYI, the following tests also fails consistently on trunk:
   ```
   org.apache.kafka.common.requests.BrokerRegistrationRequestTest.[1] 0
   org.apache.kafka.common.requests.BrokerRegistrationRequestTest.[2] 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1412428498


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -181,11 +183,13 @@ private void process(final ListOffsetsApplicationEvent 
event) {
  * it is already a member.
  */
 private void processSubscriptionChangeEvent() {
-if (!requestManagers.membershipManager.isPresent()) {
-throw new RuntimeException("Group membership manager not present 
when processing a " +
-"subscribe event");
+if (!requestManagers.heartbeatRequestManager.isPresent()) {
+KafkaException error = new KafkaException("Group membership 
manager not present when processing a subscribe event");

Review Comment:
   True, this case _shouldn't_ occur, but if it does, throwing an exception 
kills the background thread dead, which usually causes weird hanging. We 
specifically tend to avoid throwing exceptions from the event processor (or 
anything it calls) for this reason.
   
   Perhaps that's too strict of a policy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


lucasbru commented on PR #14895:
URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836550893

   @mjsax could you merge this later today if CI passes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-12-01 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1412423207


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -391,4 +402,4 @@ static AssignReplicasToDirsRequestData buildRequestData(int 
brokerId, long broke
 .setBrokerEpoch(brokerEpoch)
 .setDirectories(new ArrayList<>(directoryMap.values()));
 }
-}
+}

Review Comment:
   revert it



##
server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java:
##
@@ -248,4 +248,4 @@ void testRequeuesFailedAssignmentPropagations() throws 
InterruptedException {
 }}
 ), captor.getAllValues().get(4).build().data());
 }
-}
+}

Review Comment:
   revert it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-12-01 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1412422869


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -95,7 +96,10 @@ public void close() throws InterruptedException {
 }
 
 public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {

Review Comment:
   I changed this to have one `onAssignment` and `AssignmentEvent` constructor 
that will replace callback null with runnable that does nothing 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-01 Thread via GitHub


hanyuzheng7 opened a new pull request, #14898:
URL: https://github.com/apache/kafka/pull/14898

   Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15690: Fix restoring tasks on partition loss, flaky EosIntegrationTest [kafka]

2023-12-01 Thread via GitHub


lucasbru merged PR #14869:
URL: https://github.com/apache/kafka/pull/14869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1836538541

   Build is yellow with unrelated tests failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-01 Thread Apoorv Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792158#comment-17792158
 ] 

Apoorv Mittal commented on KAFKA-9545:
--

Reoccurred the failure: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14724/19/tests

 
{code:java}
Errororg.opentest4j.AssertionFailedError: Condition not met within timeout 
15000. Stream tasks not updated ==> expected:  but was: 
Stacktraceorg.opentest4j.AssertionFailedError: Condition not met within 
timeout 15000. Stream tasks not updated ==> expected:  but was:
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) 
   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
   at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)   
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)   
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:285)   
 at 
app//org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:295)
 at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method) at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
  at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
  at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
   at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 {code}

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Reopened] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-01 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal reopened KAFKA-9545:
--
  Assignee: (was: Boyang Chen)

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix flaky `DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations` [kafka]

2023-12-01 Thread via GitHub


hachikuji merged PR #14890:
URL: https://github.com/apache/kafka/pull/14890


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1412391352


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -198,11 +202,13 @@ private void processSubscriptionChangeEvent() {
  *  the group is sent out.
  */
 private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) {
-if (!requestManagers.membershipManager.isPresent()) {
-throw new RuntimeException("Group membership manager not present 
when processing an " +
-"unsubscribe event");
+if (!requestManagers.heartbeatRequestManager.isPresent()) {

Review Comment:
   Same as above comment?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -198,11 +202,13 @@ private void processSubscriptionChangeEvent() {
  *  the group is sent out.
  */
 private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) {
-if (!requestManagers.membershipManager.isPresent()) {
-throw new RuntimeException("Group membership manager not present 
when processing an " +
-"unsubscribe event");
+if (!requestManagers.heartbeatRequestManager.isPresent()) {

Review Comment:
   Same as the comment above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1412390835


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -181,11 +183,13 @@ private void process(final ListOffsetsApplicationEvent 
event) {
  * it is already a member.
  */
 private void processSubscriptionChangeEvent() {
-if (!requestManagers.membershipManager.isPresent()) {
-throw new RuntimeException("Group membership manager not present 
when processing a " +
-"subscribe event");
+if (!requestManagers.heartbeatRequestManager.isPresent()) {
+KafkaException error = new KafkaException("Group membership 
manager not present when processing a subscribe event");

Review Comment:
   I expect this would only be the case of a bug in our code, so a 
`RuntimeException` that would fail the background thread (and therefore the 
consumer) seemed simple and enough. Is there a reason why we need all the logic 
to pass this failure to the app thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


hanyuzheng7 commented on PR #14895:
URL: https://github.com/apache/kafka/pull/14895#issuecomment-1836500935

   Thanks @lucasbru !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15953: Refactor polling delays [kafka]

2023-12-01 Thread via GitHub


lucasbru commented on code in PR #14897:
URL: https://github.com/apache/kafka/pull/14897#discussion_r1412360054


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -51,7 +51,7 @@
  */
 public class ConsumerNetworkThread extends KafkaThread implements Closeable {
 
-private static final long MAX_POLL_TIMEOUT_MS = 5000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;

Review Comment:
   nit: // visible for testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412360413


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();
+return groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   Actually, `backgroundEventProcessor.process();` sets the group metadata. So 
returning group metadata from `maybeThrowInvalidGroupIdException` might return 
an outdated group metadata. Since now I removed 
`backgroundEventProcessor.process();`. I will reconsider the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: disable test_transactions with new group coordinator [kafka]

2023-12-01 Thread via GitHub


jolshan merged PR #14884:
URL: https://github.com/apache/kafka/pull/14884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: disable test_transactions with new group coordinator [kafka]

2023-12-01 Thread via GitHub


jolshan commented on PR #14884:
URL: https://github.com/apache/kafka/pull/14884#issuecomment-1836445897

   This is a python change so I will merge. These OutOfMemory issues are not 
good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on PR #14879:
URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836371437

   > > Can you elaborate on the direction to remove the background queue from 
the 'test builder' instead of using the one it constructed?
   > 
   > I had issues with tests using the spy on the `AsyncKafkaConsumer`. More 
precisely, a test failed with the spy but did not fail with a consumer not 
wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really 
carefully. Actually, good code does not need spies (with a few exceptions). 
Spies avoid to structure the code well. They do not force one to loosely couple 
objects. Even the [Mockito 
documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy)
 warns against their own spies. Additionally, the code under test, i.e., the 
async Kafka consumer, should not be wrapped into a spy. We should test that 
code directly to avoid possible side effects coming from the wrapping or from 
mistakes in specifying stubs on the spy.
   
   I've had exactly the same problem. Nested mocking makes it all very unhappy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >