Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1410274394


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";

Review Comment:
   Ah. Thanks! -- I mixed it up with the `group.instance.id` config... My bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1410274394


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";

Review Comment:
   Ah. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1410273486


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String GROUP_MEMBER_ID = "group_member_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";
+
+private static final String PRODUCER_NAMESPACE = "kafka.producer";
+private static final String CONSUMER_NAMESPACE = "kafka.consumer";
+
+private static final Map PRODUCER_CONFIG_MAPPING = new 
HashMap<>();
+private static final Map CONSUMER_CONFIG_MAPPING = new 
HashMap<>();
+
+private volatile Resource resource = null;
+private Map config = null;
+
+static {
+PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
ClientTelemetryProvider.TRANSACTIONAL_ID);
+
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, 
ClientTelemetryProvider.GROUP_ID);
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
ClientTelemetryProvider.GROUP_INSTANCE_ID);

Review Comment:
   Might be worth to add a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield closed pull request #14811: KAFKA-15831: KIP-1000 protocol and 
admin client
URL: https://github.com/apache/kafka/pull/14811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410260721


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +480,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Frankly, given that `fetchDeadline` might be modified (and pushed into the 
future) by a second call to `KafkaStreams#clientInstanceIds(...)` while the 
first was not completed yet, it seems we would need `synchronized` in addition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410258175


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   I like to think so :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on PR #14864:
URL: https://github.com/apache/kafka/pull/14864#issuecomment-1833242672

   @AndrewJSchofield -- updated this PR to cover more cases. Still not 
complete, but more review input is welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address if multiple ad… [kafka]

2023-11-29 Thread via GitHub


ijuma commented on PR #14813:
URL: https://github.com/apache/kafka/pull/14813#issuecomment-1833215345

   @lucasbru Did you cherry-pick to 3.6?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on PR #14843:
URL: https://github.com/apache/kafka/pull/14843#issuecomment-1833203010

   Hi @kirktrue, thanks for taking time out to review and leaving feedback. I 
have addressed the feedback, can you please re-review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1410224871


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -361,6 +370,9 @@ private void cancelInFlightRequests(String nodeId,
 }
 } else if (request.header.apiKey() == ApiKeys.METADATA) {
 metadataUpdater.handleFailedRequest(now, Optional.empty());
+} else if ((request.header.apiKey() == 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS ||
+request.header.apiKey() == ApiKeys.PUSH_TELEMETRY) && 
telemetrySender != null) {
+telemetrySender.handleFailedRequest(request.header.apiKey(), 
null);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1410198452


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4387,7 +4398,52 @@ public FenceProducersResult 
fenceProducers(Collection transactionalIds,
 
 @Override
 public Uuid clientInstanceId(Duration timeout) {
-throw new UnsupportedOperationException();
+if (timeout.isNegative()) {
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+}
+
+if (!clientTelemetryEnabled) {
+throw new IllegalStateException("Telemetry is not enabled. Set 
config `enable.metrics.push` to `true`.");
+}
+
+if (clientInstanceId != null) {
+return clientInstanceId;
+}
+
+final long now = time.milliseconds();
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+runnable.call(new Call("getTelemetrySubscriptions", 
calcDeadlineMs(now, (int) timeout.toMillis()),
+new LeastLoadedNodeProvider()) {
+
+@Override
+GetTelemetrySubscriptionsRequest.Builder createRequest(int 
timeoutMs) {
+return new GetTelemetrySubscriptionsRequest.Builder(new 
GetTelemetrySubscriptionsRequestData(), true);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+GetTelemetrySubscriptionsResponse response = 
(GetTelemetrySubscriptionsResponse) abstractResponse;
+if (response.error() != Errors.NONE) {
+future.completeExceptionally(response.error().exception());
+} else {
+future.complete(response.data().clientInstanceId());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+}, now);
+
+try {
+clientInstanceId = future.get();
+} catch (Exception e) {
+log.error("Error occurred while fetching client instance id", e);
+throw new KafkaException("Error occurred while fetching client 
instance id", e);
+}

Review Comment:
   The other 2 classes loads the subscription as part of 
GetTelemetrySubscriptions API call and as well runs the continuous poll loop to 
see any change in state to re-fetch subscription, which can work with 
signalling of loaded subscription.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1410158203


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+if (ClientMetricsReceiverPlugin.instance.isEmpty) {
+  info("Received get telemetry client request, no metrics receiver plugin 
configured or running with ZK")
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+subscriptionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   Thanks for the quick reply @junrao, I am looking into this. I thought the 
suggestion was to have a flag similar to `zkMigrationEnabled`, but it looks 
like to not emit APIKeys of telemetry if plugin is missing.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1410157251


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3747,16 +3747,56 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+clientMetricsManager match {
+  case Some(metricsManager) =>
+try {
+  if (metricsManager.isTelemetryReceiverConfigured) {
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
=>

Review Comment:
   Thanks @junrao , done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


junrao commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1410140939


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+if (ClientMetricsReceiverPlugin.instance.isEmpty) {
+  info("Received get telemetry client request, no metrics receiver plugin 
configured or running with ZK")
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+subscriptionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   @apoorvmittal10 : ApiVersionResponse is defined to include all new types of 
requests supported on the server. So, there is no need to have a separate KIP. 
There is no need to add clientTelemetryEnabled in ApiVersionResponse. If 
clientTelemetryEnabled is false on the server, we can just exclude telemetry 
related API keys from ApiVersionResponse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]

2023-11-29 Thread via GitHub


Lucent-Wong commented on PR #14666:
URL: https://github.com/apache/kafka/pull/14666#issuecomment-1833071663

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1410123451


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Uuid;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class ClientTelemetryUtils {

Review Comment:
   Thanks @kirktrue for the feedback. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1410120896


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+if (ClientMetricsReceiverPlugin.instance.isEmpty) {
+  info("Received get telemetry client request, no metrics receiver plugin 
configured or running with ZK")
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+subscriptionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   @junrao Thanks for the suggestion. Just want to clarify, does adding 
`clientTelemetryEnabled` in `apiVersionResponse` requires another KIP or we can 
add that in KIP-714 or nothing to be documented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Source task stop call was added to force stopping execution. [kafka]

2023-11-29 Thread via GitHub


github-actions[bot] commented on PR #14101:
URL: https://github.com/apache/kafka/pull/14101#issuecomment-1833053201

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1833016137

   @mjsax I have also moved ClientTelemetryUtils to `internals` package in this 
PR as well, the class is new and have changes in both PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410046365


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   I set back the default settings and it seems to trigger consistently again 
anyway. Maybe I just tricked myself into thinking I needed the other configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410043469


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Interestingly when I put it back to the typical settings it failed 
consistently 臘‍♀️ 
   I guess I tricked myself into thinking i needed the other settings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410043469


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Interestingly when I put it back to the typical settings it failed 
consistently 臘‍♀️ 
   I guess I tricked myself into thinking i needed the other settings.



##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Interestingly when I put it back to the typical settings it failed 
consistently 臘‍♀️ 
   I guess I tricked myself into thinking i needed the other settings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion [kafka]

2023-11-29 Thread via GitHub


arunmathew88 commented on PR #11438:
URL: https://github.com/apache/kafka/pull/11438#issuecomment-1832945607

   @divijvaidya A gentle nudge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-29 Thread via GitHub


soarez commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1410038816


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   @junrao : That's a good point, you're right. In between `HeartbeatRequest` 
being sent and the response being handled, `propagateDirectoryFailure` could be 
called, immediately scheduling a `HeartbeatRequest`, causing an extra request.
   
   It looks like this was already the case with `setReadyToUnfence()` and 
`beginControlledShutdown()`, which can also cause an extra request in the same 
way.
   
   We can avoid the extra requests by checking - in `OfflineDirEvent.run`, 
`SetReadyToUnfenceEvent.run` and `BeginControlledShutdownEvent.run` -  whether 
a request is inflight, and delaying calling 
`scheduleNextCommunicationImmediately` until after the response is received.
   
   Please see #14874 about there explaining comment for the test.
   
   I also see you've filed 
[KAFKA-15950](https://issues.apache.org/jira/browse/KAFKA-15950). Thanks, I'll 
have a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


hachikuji commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410034374


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Yeah, what I'm saying is that this test seems more useful as a general 
validation of transactional produce with compression. From that perspective, 
the default settings are the most useful to test. For KAFKA-15653, the lower 
level tests seem sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix flaky `MetadataLoaderTest.testNoPublishEmptyImage` [kafka]

2023-11-29 Thread via GitHub


hachikuji opened a new pull request, #14875:
URL: https://github.com/apache/kafka/pull/14875

   There is a race in the assertion on `capturedImages`. Since the future is 
signaled first, it is still possible to see an empty list.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1410029648


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }
+
+  if (currentOffset >= currentHighWatermark) {
+onLoadedBatch.accept(currentOffset)
+  }

Review Comment:
   you're right, we return
   ```
   // Required offset.
   partitionResult.lastOffset + 1
   ```
   in CoordinatorPartitionWriter#append



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Comment explaining test approach [kafka]

2023-11-29 Thread via GitHub


soarez opened a new pull request, #14874:
URL: https://github.com/apache/kafka/pull/14874

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: No need for response callback when applying controller mutation throttle [kafka]

2023-11-29 Thread via GitHub


hachikuji merged PR #14861:
URL: https://github.com/apache/kafka/pull/14861


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: No need for response callback when applying controller mutation throttle [kafka]

2023-11-29 Thread via GitHub


hachikuji commented on PR #14861:
URL: https://github.com/apache/kafka/pull/14861#issuecomment-1832915608

   All failures seem to be flaky. I tested them locally. I will merge to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410014399


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   I didn't see it fail much at all with the default settings. It was like once 
in 60 times or less which was not super useful. There are lower level tests for 
the callback using the correct requestLocal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


hachikuji commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1410013739


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   My take is that this test should probably just test compression of 
transactional data with the default settings. I don't think it necessarily 
needs to hit the case from KAFKA-15653. It seems like it doesn't do so reliably 
anyway. Are there lower level tests where we can validate callback safety?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction

2023-11-29 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791367#comment-17791367
 ] 

Jun Rao commented on KAFKA-15950:
-

cc [~soarez] 

> CommunicationEvent should be scheduled with EarliestDeadlineFunction
> 
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, CommunicationEvent is scheduled with DeadlineFunction, which 
> ignores the schedule time for an existing event. This wasn't an issue when 
> CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
> CommunicationEvent could be scheduled immediately for offline dirs. If a 
> periodic CommunicationEvent is scheduled after the immediate 
> CommunicationEvent in KafkaEventQueue, the former will cancel the latter, but 
> leaves the schedule time to be periodic. This will unnecessarily delay the 
> communication of the failed dir to the controller. 
>  
> Using EarliestDeadlineFunction will fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction

2023-11-29 Thread Jun Rao (Jira)
Jun Rao created KAFKA-15950:
---

 Summary: CommunicationEvent should be scheduled with 
EarliestDeadlineFunction
 Key: KAFKA-15950
 URL: https://issues.apache.org/jira/browse/KAFKA-15950
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.7.0
Reporter: Jun Rao


Currently, CommunicationEvent is scheduled with DeadlineFunction, which ignores 
the schedule time for an existing event. This wasn't an issue when 
CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
CommunicationEvent could be scheduled immediately for offline dirs. If a 
periodic CommunicationEvent is scheduled after the immediate CommunicationEvent 
in KafkaEventQueue, the former will cancel the latter, but leaves the schedule 
time to be periodic. This will unnecessarily delay the communication of the 
failed dir to the controller. 
 
Using EarliestDeadlineFunction will fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15941:

Labels: flaky-test  (was: )

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15941:

Component/s: streams
 unit tests

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15944) Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] – org.apache.kafka.streams.integration.PositionRestartIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15944:

Component/s: unit tests

> Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] 
> – org.apache.kafka.streams.integration.PositionRestartIntegrationTest
> --
>
> Key: KAFKA-15944
> URL: https://issues.apache.org/jira/browse/KAFKA-15944
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Priority: Minor
>  Labels: flaky-test
>
> Error
> org.apache.kafka.common.errors.TimeoutException: The query never returned 
> within the bound. Last result: 
> StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested 
> partition was not present at the time of the query.', executionInfo=[], 
> position=null}}, globalResult=null}
> Stacktrace
> org.apache.kafka.common.errors.TimeoutException: The query never returned 
> within the bound. Last result: 
> StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested 
> partition was not present at the time of the query.', executionInfo=[], 
> position=null}}, globalResult=null}
> Standard Output
> [2023-11-28 22:52:47,353] INFO [Producer clientId=producer-129] Instantiated 
> an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer:587)
> [2023-11-28 22:52:47,466] INFO [Producer clientId=producer-129] ProducerId 
> set to 0 with epoch 0 
> (org.apache.kafka.clients.producer.internals.TransactionManager:502)
> [2023-11-28 22:52:47,473] INFO [Producer clientId=producer-129] Closing the 
> Kafka producer with timeoutMillis = 9223372036854775807 ms. 
> (org.apache.kafka.clients.producer.KafkaProducer:1332)
> [2023-11-28 22:52:47,531] INFO stream-client 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57]
>  Kafka Streams version: test-version 
> (org.apache.kafka.streams.KafkaStreams:914)
> [2023-11-28 22:52:47,531] INFO stream-client 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57]
>  Kafka Streams commit ID: test-commit-ID 
> (org.apache.kafka.streams.KafkaStreams:915)
> [2023-11-28 22:52:47,532] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating restore consumer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:365)
> [2023-11-28 22:52:47,537] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating thread producer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:105)
> [2023-11-28 22:52:47,538] INFO [Producer 
> clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1-producer]
>  Instantiated an idempotent producer. 
> (org.apache.kafka.clients.producer.KafkaProducer:587)
> [2023-11-28 22:52:47,545] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating consumer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:432)
> [2023-11-28 22:52:47,545] INFO state-updater 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StateUpdater-1]
>  State updater thread started 
> (org.apache.kafka.streams.processor.internals.DefaultStateUpdater:135)
> [2023-11-28 22:52:47,547] INFO [Producer 
> clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1-producer]
>  ProducerId set to 1 with epoch 0 
> (org.apache.kafka.clients.producer.internals.TransactionManager:502)
> [2023-11-28 22:52:47,550] INFO stream-thread 
> 

[jira] [Resolved] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration

2023-11-29 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15311.
--
Fix Version/s: 3.7.0
   Resolution: Fixed

> Fix docs about reverting to ZooKeeper mode during KRaft migration
> -
>
> Key: KAFKA-15311
> URL: https://issues.apache.org/jira/browse/KAFKA-15311
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
> Fix For: 3.7.0
>
>
> The cocs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on PR #14546:
URL: https://github.com/apache/kafka/pull/14546#issuecomment-1832839662

   I'm going to close this for now since it is superseded by #14160. If you 
find more stuff that should be fixed, please do open a new one! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]

2023-11-29 Thread via GitHub


cmccabe closed pull request #14546: MINOR: Zk to KRaft migration is now 
production ready
URL: https://github.com/apache/kafka/pull/14546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1409963570


##
metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.Uuid;
+
+/**
+ * Provide the default directory for new partitions in a given broker.
+ */
+@FunctionalInterface
+public interface DefaultDirProvider {
+Uuid defaultDir(int brokerId);

Review Comment:
   > The caller needs to handle it regarldless. If there's anything other than 
an empty list set in directories, serializing the record in a MV that does not 
yet support dir assignment triggers a UnsupportedVersionException with 
"Attempted to write a non-default directories at version ...".
   
   In what I was proposing, the caller doesn't need to handle that, though, 
since it's not supposed to happen.
   
   > I agree, I would rather not have to configure PartitionChangeBuilder with 
a DefaultDirProvider. But maybe I'm not quite picturing what you are suggesting.
   
   Well, I was suggesting that the MV stuff be handled in the 
DefaultDirProvider rather than in PartitionChangeBuilder. But I guess the 
latter is already handling MV, so there isn't as much to be gained there. Fair 
enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409942673


##
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##
@@ -386,4 +418,21 @@ public void testOffsetCommitValueVersion(MetadataVersion 
metadataVersion) {
 public void 
testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion 
metadataVersion) {
 assertEquals((short) 1, 
metadataVersion.offsetCommitValueVersion(true));
 }
+
+@Test
+public void assertLatestProductionIsLessThanLatest() {
+assertTrue(LATEST_PRODUCTION.ordinal() < 
MetadataVersion.latest().ordinal(),
+"Expected LATEST_PRODUCTION " + LATEST_PRODUCTION +
+" to be less than the latest of " + MetadataVersion.latest());
+}

Review Comment:
   yes. there should always be a new unstable version for the new stuff to go 
in :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409942437


##
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##
@@ -329,19 +336,44 @@ public void 
testIsDelegationTokenSupported(MetadataVersion metadataVersion) {
 @ParameterizedTest
 @EnumSource(value = MetadataVersion.class)
 public void testIsElrSupported(MetadataVersion metadataVersion) {
-assertEquals(metadataVersion.equals(IBP_3_7_IV1),
-metadataVersion.isElrSupported());
-short expectPartitionRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-assertEquals(expectPartitionRecordVersion, 
metadataVersion.partitionRecordVersion());
-short expectPartitionChangeRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-assertEquals(expectPartitionChangeRecordVersion, 
metadataVersion.partitionChangeRecordVersion());
+assertEquals(metadataVersion.equals(IBP_3_7_IV3), 
metadataVersion.isElrSupported());

Review Comment:
   yes to both



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409941745


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -207,6 +213,22 @@ public enum MetadataVersion {
  */
 public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = 
IBP_3_3_IV0;
 
+/**
+ * The latest production-ready MetadataVersion. This is the latest version 
that is stable
+ * and cannot be changed. MetadataVersions later than this can be tested 
via junit, but
+ * not deployed in production.
+ *
+ * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
+ * IT CANNOT BE CHANGED.
+ */
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0;

Review Comment:
   yes. once it's production we'll change this (could be with a MINOR or with a 
JIRA). And then we can't change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409941250


##
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##
@@ -42,13 +42,13 @@
   "about": "The epoch of the partition leader." },
 { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
   "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+{ "name": "Directories", "type": "[]uuid", "versions": "1+",
+  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
 { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+  "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,

Review Comment:
   fixed



##
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##
@@ -42,13 +42,13 @@
   "about": "The epoch of the partition leader." },
 { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
   "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+{ "name": "Directories", "type": "[]uuid", "versions": "1+",
+  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
 { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+  "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,
   "about": "The eligible leader replicas of this partition." },
 { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 2,
-  "about": "The last known eligible leader replicas of this partition." },
-{ "name": "Directories", "type": "[]uuid", "versions": "2+",
-  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."}
+  "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 2,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409938934


##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -2191,7 +2191,7 @@ class PartitionTest extends AbstractPartitionTest {
 val partition = new Partition(
   topicPartition,
   replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
-  interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV1,
+  interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV2,

Review Comment:
   Wanted a JBOD version here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15949) Improve the KRaft metadata version related messages

2023-11-29 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-15949:


 Summary: Improve the KRaft metadata version related messages
 Key: KAFKA-15949
 URL: https://issues.apache.org/jira/browse/KAFKA-15949
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.6.0
Reporter: Jakub Scholz


Various error messages related to KRaft seem to use very different style and 
formatting. Just for example in the {{StorageTool}} Scala class, there are two 
different examples:
 * {{Must specify a valid KRaft metadata version of at least 3.0.}}
 ** Refers to "metadata version"
 ** Refers to the version as 3.0 (although strictly speaking 3.0-IV0 is not 
valid for KRaft)
 * {{SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.}}
 ** Talks about "metadataVersion"
 ** Refers to "IBP_3_5_IV2" instead of "3.5" or "3.5-IV2"

Other pieces of Kafka code seem to also talk about "metadata.version" for 
example.

For users, it would be nice if the style and formats used were the same 
everywhere. Would it be worth unifying messages like this? If yes, what would 
be the preferred style to use?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-11-29 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791354#comment-17791354
 ] 

Haruki Okada commented on KAFKA-9693:
-

[~paolomoriello] [~novosibman] Hi, I believe the latency spike due to flushing 
on log.roll is now resolved by https://issues.apache.org/jira/browse/KAFKA-15046

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> URL: https://issues.apache.org/jira/browse/KAFKA-9693
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: OS: Amazon Linux 2
> Kafka version: 2.2.1
>Reporter: Paolo Moriello
>Assignee: Paolo Moriello
>Priority: Major
>  Labels: Performance, latency, performance
> Fix For: 3.7.0
>
> Attachments: image-2020-03-10-13-17-34-618.png, 
> image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, 
> image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, 
> image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, 
> image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, 
> latency_plot2.png
>
>
> h1. Summary
> When a log segment fills up, Kafka rolls over onto a new active segment and 
> force the flush of the old segment to disk. When this happens, log segment 
> _append_ duration increase causing important latency spikes on producer(s) 
> and replica(s). This ticket aims to highlight the problem and propose a 
> simple mitigation: add a new configuration to enable/disable rolled segment 
> flush.
> h1. 1. Phenomenon
> Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to 
> ~50x-200x more than usual. For instance, normally 99th %ile is lower than 
> 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th 
> %iles even jump to 500-700ms.
> Latency spikes happen at constant frequency (depending on the input 
> throughput), for small amounts of time. All the producers experience a 
> latency increase at the same time.
> h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314!
> {{Example of response time plot observed during on a single producer.}}
> URPs rarely appear in correspondence of the latency spikes too. This is 
> harder to reproduce, but from time to time it is possible to see a few 
> partitions going out of sync in correspondence of a spike.
> h1. 2. Experiment
> h2. 2.1 Setup
> Kafka cluster hosted on AWS EC2 instances.
> h4. Cluster
>  * 15 Kafka brokers: (EC2 m5.4xlarge)
>  ** Disk: 1100Gb EBS volumes (4750Mbps)
>  ** Network: 10 Gbps
>  ** CPU: 16 Intel Xeon Platinum 8000
>  ** Memory: 64Gb
>  * 3 Zookeeper nodes: m5.large
>  * 6 producers on 6 EC2 instances in the same region
>  * 1 topic, 90 partitions - replication factor=3
> h4. Broker config
> Relevant configurations:
> {quote}num.io.threads=8
>  num.replica.fetchers=2
>  offsets.topic.replication.factor=3
>  num.network.threads=5
>  num.recovery.threads.per.data.dir=2
>  min.insync.replicas=2
>  num.partitions=1
> {quote}
> h4. Perf Test
>  * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per 
> broker)
>  * record size = 2
>  * Acks = 1, linger.ms = 1, compression.type = none
>  * Test duration: ~20/30min
> h2. 2.2 Analysis
> Our analysis showed an high +correlation between log segment flush count/rate 
> and the latency spikes+. This indicates that the spikes in max latency are 
> related to Kafka behavior on rolling over new segments.
> The other metrics did not show any relevant impact on any hardware component 
> of the cluster, eg. cpu, memory, network traffic, disk throughput...
>  
>  !latency_plot2.png|width=924,height=308!
>  {{Correlation between latency spikes and log segment flush count. p50, p95, 
> p99, p999 and p latencies (left axis, ns) and the flush #count (right 
> axis, stepping blue line in plot).}}
> Kafka schedules logs flushing (this includes flushing the file record 
> containing log entries, the offset index, the timestamp index and the 
> transaction index) during _roll_ operations. A log is rolled over onto a new 
> empty log when:
>  * the log segment is full
>  * the maxtime has elapsed since the timestamp of first message in the 
> segment (or, in absence of it, since the create time)
>  * the index is full
> In this case, the increase in latency happens on _append_ of a new message 
> set to the active segment of the log. This is a synchronous operation which 
> therefore blocks producers requests, causing the latency increase.
> To confirm this, I instrumented Kafka to measure the duration of 
> FileRecords.append(MemoryRecords) method, which is responsible of writing 
> memory records to file. As a result, I observed the same spiky pattern as in 
> the producer latency, with a one-to-one 

Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


junrao commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1409930152


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+if (ClientMetricsReceiverPlugin.instance.isEmpty) {
+  info("Received get telemetry client request, no metrics receiver plugin 
configured or running with ZK")
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+subscriptionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   Yes, I agree that we need to exclude telemetry requests in 
`APIVersionsResponse` if ClientMetrics Exporter Plugin is not configured. We 
could do that by passing in the plugin availability flag through 
`ApiVersionManager.apiVersionResponse`.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3747,16 +3747,56 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+clientMetricsManager match {
+  case Some(metricsManager) =>
+try {
+  if (metricsManager.isTelemetryReceiverConfigured) {
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
=>

Review Comment:
   If we use `sendResponseMaybeThrottle`, we need to set requestThrottleMs in 
the response. A simpler approach is to use `sendMaybeThrottle`, which 
automatically sets `requestThrottleMs` in the response. Ditto in other usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]

2023-11-29 Thread via GitHub


gharris1727 commented on PR #14795:
URL: https://github.com/apache/kafka/pull/14795#issuecomment-1832781742

   Also worth a follow-up is maybe tackling the outliers that do experience 
truncation, and try to reduce their log volume to a more reasonable level.
   The biggest offenders above 10MB appear to be:
   
   Size | Test
   -- | --
   153.56MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldWaitForMissingInputTopicsToBeCreated()
   142.97MB | 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=false]
   103.84MB | 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled
   83.26MB | 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.testThreadSafety
   82.94MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
   66.77MB | 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.testConcurrentRandomSequence()
   66.68MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies()
   45.17MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning()
   41.76MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
   39.36MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology()
   34.26MB | 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=true]
   29.29MB | 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorLargeNumConsumers
   28.11MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowPatternSubscriptionWithMultipleNamedTopologies()
   24.25MB | 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful()[6]
   22.99MB | 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSourceConnectorOffsetsExactlyOnceSupportEnabled
   21.63MB | 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testAlterLogDirReassignmentThrottle(String)[1]
   20.61MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldBackOffTaskAndEmitDataWithinSameTopology()
   20.39MB | 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testLogDirReassignment(String)[1]
   19.17MB | 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest.shouldThrowErrorAfterSourceTopicDeleted
   17.75MB | 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful()[2]
   17.32MB | 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   16.30MB | 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful()[2]
   15.47MB | 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testCancellation(String)[1]
   13.77MB | 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testThrottledReassignment(String)[1]
   12.64MB | 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testCancellation(String)[2]
   12.46MB | 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful()[6]
   10.22MB | 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology()
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]

2023-11-29 Thread via GitHub


nizhikov commented on PR #14588:
URL: https://github.com/apache/kafka/pull/14588#issuecomment-1832765723

   Hello @jolshan are you ready to merge this? 
   Can I improve this PR somehow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]

2023-11-29 Thread via GitHub


gharris1727 commented on PR #14795:
URL: https://github.com/apache/kafka/pull/14795#issuecomment-1832757011

   I did some statistics on the current state of log truncation in CI. I 
learned that:
   
   * A full run of `./gradlew test` writes 1.73GB of logs
   * 814 of 1532 test suites (53%) produce 0 logs
   
   For the existing suite-level truncation:
   * 1.71GB (98%) of these logs are discarded due to truncation
   * 27MB (1.5%) of these logs are kept after truncation
   * 175 of 1532 test suites (11%) experience truncation
   * 5721 of 26298 tests (22%) are in test suites that experience truncation
   * Test suites which produce logs average 37kb of logs after truncation
   
   With the test-level truncation proposed here:
   * 1.61GB (93%) of these logs are discarded due to test-level truncation
   * 126MB (7%) of these logs are kept after truncation
   * 452 of 26298 tests (1.7%) experience truncation
   * Tests which produce logs average 15kb of logs after truncation
   
   So, assuming a worst-case run with every test failing (as logs are only kept 
for failed tests) and log volume similar to successful runs, this change would 
cost 5 times (126MB/27MB) as much log storage space. However, any particular 
test would be **~12 times (5721/452) less likely to experience truncation**.
   
   We don't regularly see fully-failed test suites, and instead typically see 
small numbers of test failures. If we assume test failures to be uniformly 
distributed among all tests (which they almost certainly aren't, but I don't 
have statistics for that) we can use averages to calculate the expected 
persisted logs per test failure. Since a test failure in suite-truncation keeps 
the truncated logs for the whole suite, each test failure adds on average 37kb 
of logs, or less if multiple tests in the same suite fail. Test failures under 
test-truncation only keep logs for the individual tests, which averages 15kb, 
and receives no discount for multiple test failures. So assuming a small number 
of test failures which is typical, **the cost of storing these logs is 2 times 
(37kb/15kb) less**.
   
   I believe that this change should not be harmful to the Jenkins test 
infrastructure, and will immediately benefit our ability to debug tests via CI, 
especially flaky failures.
   
   @ijuma @mimaison @divijvaidya Could you take a look at this when you have a 
chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6

2023-11-29 Thread Ludo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ludo resolved KAFKA-15947.
--
Fix Version/s: 3.6.1
   Resolution: Duplicate

> Null pointer on LZ4 compression since Kafka 3.6
> ---
>
> Key: KAFKA-15947
> URL: https://issues.apache.org/jira/browse/KAFKA-15947
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 3.6.0
>Reporter: Ludo
>Priority: Major
> Fix For: 3.6.1
>
>
> I have a Kafka Stream application running well since month using client 
> version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} 
> using{{ compression.type: "lz4"}}
> I've recently updated a my kafka server to kafka 3.6 (bitnami image: 
> {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}.
>  
> The startup is working well for days, and after some time, Kafka Stream crash 
> and Kafka output a lot of NullPointerException on the console: 
>  
> {code:java}
> org.apache.kafka.common.KafkaException: java.lang.NullPointerException: 
> Cannot invoke "java.nio.ByteBuffer.hasArray()" because 
> "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132)
>   ... 25 more {code}
> At the same time the Kafka Stream raise this error:
>  
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic kestra_workertaskresult for task 3_6 due 
> to:org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.Written offsets 
> would not be recorded and no more records would be sent since this is a fatal 
> error.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at
>  
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at
>  
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at
>  
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)at
>  
> 

[jira] [Commented] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6

2023-11-29 Thread Ludo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791343#comment-17791343
 ] 

Ludo commented on KAFKA-15947:
--

Yes seems the same, and yes I use transaction (with Kafka Streams). 

thanks for highlight this issues.

> Null pointer on LZ4 compression since Kafka 3.6
> ---
>
> Key: KAFKA-15947
> URL: https://issues.apache.org/jira/browse/KAFKA-15947
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 3.6.0
>Reporter: Ludo
>Priority: Major
>
> I have a Kafka Stream application running well since month using client 
> version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} 
> using{{ compression.type: "lz4"}}
> I've recently updated a my kafka server to kafka 3.6 (bitnami image: 
> {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}.
>  
> The startup is working well for days, and after some time, Kafka Stream crash 
> and Kafka output a lot of NullPointerException on the console: 
>  
> {code:java}
> org.apache.kafka.common.KafkaException: java.lang.NullPointerException: 
> Cannot invoke "java.nio.ByteBuffer.hasArray()" because 
> "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132)
>   ... 25 more {code}
> At the same time the Kafka Stream raise this error:
>  
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic kestra_workertaskresult for task 3_6 due 
> to:org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.Written offsets 
> would not be recorded and no more records would be sent since this is a fatal 
> error.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at
>  
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at
>  
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at
>  
> 

[jira] [Comment Edited] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6

2023-11-29 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791332#comment-17791332
 ] 

Justine Olshan edited comment on KAFKA-15947 at 11/29/23 9:41 PM:
--

Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? 
Are you running with transactions?

The above ticket will be fixed in 3.6.1


was (Author: jolshan):
Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? 
Are you running with transactions?

> Null pointer on LZ4 compression since Kafka 3.6
> ---
>
> Key: KAFKA-15947
> URL: https://issues.apache.org/jira/browse/KAFKA-15947
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 3.6.0
>Reporter: Ludo
>Priority: Major
>
> I have a Kafka Stream application running well since month using client 
> version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} 
> using{{ compression.type: "lz4"}}
> I've recently updated a my kafka server to kafka 3.6 (bitnami image: 
> {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}.
>  
> The startup is working well for days, and after some time, Kafka Stream crash 
> and Kafka output a lot of NullPointerException on the console: 
>  
> {code:java}
> org.apache.kafka.common.KafkaException: java.lang.NullPointerException: 
> Cannot invoke "java.nio.ByteBuffer.hasArray()" because 
> "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132)
>   ... 25 more {code}
> At the same time the Kafka Stream raise this error:
>  
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic kestra_workertaskresult for task 3_6 due 
> to:org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.Written offsets 
> would not be recorded and no more records would be sent since this is a fatal 
> error.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at
>  
> 

Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1832745572

   In the current implementation, when poll timer expires, the consumer sends a 
leavegroup request w/o revoking the partitions. This is obviously a bit 
different to the current state transition, so I'm not sure what's the right 
thing to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6

2023-11-29 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791332#comment-17791332
 ] 

Justine Olshan commented on KAFKA-15947:


Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? 
Are you running with transactions?

> Null pointer on LZ4 compression since Kafka 3.6
> ---
>
> Key: KAFKA-15947
> URL: https://issues.apache.org/jira/browse/KAFKA-15947
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 3.6.0
>Reporter: Ludo
>Priority: Major
>
> I have a Kafka Stream application running well since month using client 
> version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} 
> using{{ compression.type: "lz4"}}
> I've recently updated a my kafka server to kafka 3.6 (bitnami image: 
> {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}.
>  
> The startup is working well for days, and after some time, Kafka Stream crash 
> and Kafka output a lot of NullPointerException on the console: 
>  
> {code:java}
> org.apache.kafka.common.KafkaException: java.lang.NullPointerException: 
> Cannot invoke "java.nio.ByteBuffer.hasArray()" because 
> "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132)
>   ... 25 more {code}
> At the same time the Kafka Stream raise this error:
>  
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic kestra_workertaskresult for task 3_6 due 
> to:org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.Written offsets 
> would not be recorded and no more records would be sent since this is a fatal 
> error.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at
>  
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at
>  
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at
>  
> 

Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409900140


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -59,10 +59,14 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace, 
Option(config.get.interBrokerProtocolVersionString))
+  val metadataVersion = getMetadataVersion(namespace,
+
Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
   if (!metadataVersion.isKRaftSupported) {
 throw new TerseFailure(s"Must specify a valid KRaft metadata 
version of at least 3.0.")
   }
+  if (!metadataVersion.isProduction()) {
+throw new TerseFailure(s"Metadata version ${metadataVersion} is 
not ready for production use yet.")
+  }

Review Comment:
   I created an internal configuration `unstable.metadata.versions.enable` to 
bypass this. Since it is internal and undocumented, it doesn't need a KIP.
   
   (Also see the very similar `unstable.api.versions.enable`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409899121


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   What is the exact to do in KIP-848 if the poll timer expires? The current 
implementation sends a leavegroup request directly without revoking partitions. 
 If we want to do that, we need to modify the state transition a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1409884417


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Sorry I think I got confused about when the response is being sent. I will 
say that adding more request threads didn't work and I had to lower them to get 
it to reproduce frequently?
   
   I can try it the other way around, but iirc, there were a lot of 
combinations that were not as consistent as it is now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   Not sure if kip-848 changes this but
   ```
   // Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]

2023-11-29 Thread via GitHub


cmccabe merged PR #14858:
URL: https://github.com/apache/kafka/pull/14858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-29 Thread via GitHub


soarez commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1409866239


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() {
 targetAdding = Collections.emptyList();
 }
 
-public Optional build() {
+public Optional build(DefaultDirProvider 
defaultDirProvider) {

Review Comment:
   Updated this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on PR #14860:
URL: https://github.com/apache/kafka/pull/14860#issuecomment-1832698639

   > Left a few comments, nothing major directly on the changes.
   > 
   > I do have a high-level thought to put out there. This currently prevents 
the storage tool from formatting with a non production MetadataVersion and 
prevents the feature tool from updating a running cluster to a non-production 
MetadataVersion. I wonder if this will break anything, and I wonder if perhaps 
we should add a break-glass mechanism to circumvent these things in case 
something does break. While this may not require a KIP per-se, I would not be 
surprised if changing this causes issues for people somewhere.
   
   I don't think this will break any existing use-cases.
   
   I agree that having a break-glass mechanism would be useful for some testing 
cases. Although we need to be very sure to document that it shouldn't be used 
in production. Maybe we can follow up with a KIP later. But it would be too 
late for 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   Not sure if kip-848 changes this but
   ```
   // Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]

2023-11-29 Thread via GitHub


cmccabe commented on PR #14858:
URL: https://github.com/apache/kafka/pull/14858#issuecomment-1832693965

   > The directory fields in BrokerRegistrationRequest, RegisterBrokerRecord, 
BrokerRegistrationChangeRecord, PartitionRecord and PartitionChangeRecord are 
all of type Uuid[], which defaults to an empty list. We don't have any 
directory related fields of type Uuid in requests or metadata records. But I 
still think it makes sense to change this, because we might need such fields in 
the future.
   
   Yeah. Good points.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-29 Thread via GitHub


soarez commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1409860115


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int 
partitionId, ImageWriterO
 record.setDirectories(Uuid.toList(directories));
 } else {
 for (Uuid directory : directories) {
-if (!DirectoryId.UNASSIGNED.equals(directory)) {
+if (!DirectoryId.UNASSIGNED.equals(directory) && 
!DirectoryId.MIGRATING.equals(directory)) {

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee opened a new pull request, #14873:
URL: https://github.com/apache/kafka/pull/14873

   Currently, poll interval is not being respected during consumer#poll.  We 
need to make sure the consumer can leave the group actively when the user 
doesn't poll frequently enough.  This PR added 1. a poll timer and 2. a boolean 
flag notifying joining on subsequent poll.
   - The poll timer is always configured with rebalanceTimeout
   - The boolean flag, rejoinOnPoll, is set when the leaveGroup completes.  
Such as the subsequent poll and rejoin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409851471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java:
##
@@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {}
  * any post unloading operations.
  */
 default void onUnloaded() {}
+
+/**
+ * Replay a record to update the state machine.
+ *
+ * @param record The record to replay.
+ */
+default void replay(U record) throws RuntimeException {}

Review Comment:
   this doesn't work because CoordinatorShard no longer longer extends 
CoordinatorPlayback so it doesn't have a replay() method anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1409850375


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
 assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test

Review Comment:
   I put the tests here because they do only apply to the async consumer for 
the moment. As I wrote in a different comment, moving forward, we should move 
this tests to the `ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1409848052


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {

Review Comment:
   I think moving forward when we will also change the validation of the group 
ID for the legacy consumer in 4.0 we should write a validator for the group ID 
config in `ConsumerConfig` throw there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on PR #14872:
URL: https://github.com/apache/kafka/pull/14872#issuecomment-1832675480

   @kirktrue please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]

2023-11-29 Thread via GitHub


cadonna commented on PR #14768:
URL: https://github.com/apache/kafka/pull/14768#issuecomment-1832672849

   @kirktrue My PR regarding 
[KAFKA-15281](https://issues.apache.org/jira/browse/KAFKA-15281) overlaps a lot 
with this. So I broke out the relevant parts for the group ID verification from 
my PR and opened https://github.com/apache/kafka/pull/14872.
   
   That will avoid a lot of merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna opened a new pull request, #14872:
URL: https://github.com/apache/kafka/pull/14872

   Verifies that the group ID passed into the async consumer is valid. That is, 
if the group ID is not null, it is not empty or it does not consist of only 
whitespaces.
   
   This change stores the group ID in the group metadata because KAFKA-15281 
about the group metadata API will build on that.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


soarez commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409826262


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -207,6 +213,22 @@ public enum MetadataVersion {
  */
 public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = 
IBP_3_3_IV0;
 
+/**
+ * The latest production-ready MetadataVersion. This is the latest version 
that is stable
+ * and cannot be changed. MetadataVersions later than this can be tested 
via junit, but
+ * not deployed in production.
+ *
+ * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
+ * IT CANNOT BE CHANGED.
+ */
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0;

Review Comment:
   So I guess we need another JIRA to change this to `IBP_3_7_IV2` before the 
3.7 release?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409822776


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +480,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Yes, I saw the `volatile`. I agree that the ordering is sufficient to make 
it safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821677


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821009


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);

Review Comment:
   Comments :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409819732


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   OK, it sounds like you have a plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]

2023-11-29 Thread via GitHub


soarez commented on PR #14858:
URL: https://github.com/apache/kafka/pull/14858#issuecomment-1832632146

   The directory fields in `BrokerRegistrationRequest`, `RegisterBrokerRecord`, 
`BrokerRegistrationChangeRecord`, `PartitionRecord` and `PartitionChangeRecord` 
are all of type `Uuid[]`, which defaults to an empty list. We don't have any 
directory related fields of type `Uuid` in requests or metadata records. But I 
still think it makes sense to change this, because we might need such fields in 
the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2023-11-29 Thread via GitHub


gharris1727 commented on PR #14729:
URL: https://github.com/apache/kafka/pull/14729#issuecomment-1832624782

   Hey @cmccabe @hachikuji Could you take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


hachikuji commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1409800961


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   Not sure I follow that. Whether there are any replicas or not, wouldn't the 
request thread will be freed after sending out the verification? One way I 
could see the case being more likely is if we added additional request threads. 
I don't think there's any affinity in the callback to the original request 
thread, is there? So more available request threads probably means a greater 
chance a separate thread would handle the callback. Does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-11-29 Thread via GitHub


wcarlson5 commented on PR #14107:
URL: https://github.com/apache/kafka/pull/14107#issuecomment-1832612612

   I'm going to start my review of this today. Hopefully can be actionable 
sometime tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
clientInstanceIdFuture.completeExceptionally(error);
+fetchDeadline = -1;

Review Comment:
   Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would 
set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be 
done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight 
waiting for completion.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow

Review Comment:
   Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds()) 
{` (from above) further below.



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   How strict we can obey the given `timeout` is somewhat tricky, given that we 
need to call `clientInstanceId()` for each client we have. -- The idea was to 
basically "fan-out" all these calls and to them in parallel (note that 
`globalConsumerInstanceId` will return immediately and not block, but hand the 
execution from the user thread to the `GlobalStreamThread`; that's why we 
return a Future) -- thus it should be ok to provide the same timeout to each 
call (as all of them are done in parallel)?
   
   If you have any good suggestion how it could be done better, let me know.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);

Review Comment:
   Yes, but this was intentionally. The `GlobalStreamThread` does this call "on 
the side", and thus the idea is to just call it with no timeout to just trigger 
the background RPC and not block the thread from doing its actually work at 
all. -- There won't be a busy wait, because the global thread will do other 
useful work in the meantime 

[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2023-11-29 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750]
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] 
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750]
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] 
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769]
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750]
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] 
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] 
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] 
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] 
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]

2023-11-29 Thread via GitHub


gharris1727 merged PR #14769:
URL: https://github.com/apache/kafka/pull/14769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]

2023-11-29 Thread via GitHub


gharris1727 commented on PR #14769:
URL: https://github.com/apache/kafka/pull/14769#issuecomment-1832605577

   The streams test failures appear unrelated, as they are not subclasses of 
the changed tests, and all streams tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1409788411


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
-  // Just a place holder for now.
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
-requestHelper.sendMaybeThrottle(request, 
request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
+
+if (ClientMetricsReceiverPlugin.instance.isEmpty) {
+  info("Received get telemetry client request, no metrics receiver plugin 
configured or running with ZK")
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+subscriptionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   We can wait for @junrao to weigh in, but I think the handling of 
`APIVersions` should be as written in the KIP. The whole point is that a client 
can tell from the `APIVersionsResponse` that there is no need to get a 
telemetry subscription. For a cluster without client telemetry enabled, there 
are no additional RPCs when the client connects. With this PR, a 3.7 client 
will make extra RPCs even when client telemetry is not enabled. This should be 
avoided if at all possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409030872


##
streams/src/main/java/org/apache/kafka/streams/ClientInstanceIds.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * Encapsulates the {@code client instance id} used for metrics collection by
+ * producers, consumers, and the admin client used by Kafka Streams.
+ */
+public interface ClientInstanceIds {

Review Comment:
   I updated the KIP and changed this from `class` to `interface`.



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,56 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.

Review Comment:
   I updated the KIP and change this to `IllegalStateException` (it does not 
make sense to throw an sub-class of `InvalidStateStoreException` and other 
methods on `KafkaStreams` also use `IllegalStateException`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


kirktrue commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1409772670


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4387,7 +4398,52 @@ public FenceProducersResult 
fenceProducers(Collection transactionalIds,
 
 @Override
 public Uuid clientInstanceId(Duration timeout) {
-throw new UnsupportedOperationException();
+if (timeout.isNegative()) {
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+}
+
+if (!clientTelemetryEnabled) {
+throw new IllegalStateException("Telemetry is not enabled. Set 
config `enable.metrics.push` to `true`.");
+}
+
+if (clientInstanceId != null) {
+return clientInstanceId;
+}
+
+final long now = time.milliseconds();
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+runnable.call(new Call("getTelemetrySubscriptions", 
calcDeadlineMs(now, (int) timeout.toMillis()),
+new LeastLoadedNodeProvider()) {
+
+@Override
+GetTelemetrySubscriptionsRequest.Builder createRequest(int 
timeoutMs) {
+return new GetTelemetrySubscriptionsRequest.Builder(new 
GetTelemetrySubscriptionsRequestData(), true);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+GetTelemetrySubscriptionsResponse response = 
(GetTelemetrySubscriptionsResponse) abstractResponse;
+if (response.error() != Errors.NONE) {
+future.completeExceptionally(response.error().exception());
+} else {
+future.complete(response.data().clientInstanceId());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+}, now);
+
+try {
+clientInstanceId = future.get();
+} catch (Exception e) {
+log.error("Error occurred while fetching client instance id", e);
+throw new KafkaException("Error occurred while fetching client 
instance id", e);
+}

Review Comment:
   Why do we make an explicit RPC call for the admin client, but the other two 
clients use the `ClientTelemetryUtils. fetchClientInstanceId`?



##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Uuid;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class ClientTelemetryUtils {

Review Comment:
   I think moving it to `internals` makes sense.



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -361,6 +370,9 @@ private void cancelInFlightRequests(String nodeId,
 }
 } else if (request.header.apiKey() == ApiKeys.METADATA) {
 metadataUpdater.handleFailedRequest(now, Optional.empty());
+} else if ((request.header.apiKey() == 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS ||
+request.header.apiKey() == ApiKeys.PUSH_TELEMETRY) && 
telemetrySender != null) {
+telemetrySender.handleFailedRequest(request.header.apiKey(), 
null);

Review Comment:
   nit: maybe creating an `ApiKeys` from `request.header.apiKey()` would make 
this visually cleaner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: add config for balance subtopology in rack aware task assignment [kafka]

2023-11-29 Thread via GitHub


mjsax merged PR #14711:
URL: https://github.com/apache/kafka/pull/14711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown

2023-11-29 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15948:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> Refactor AsyncKafkaConsumer shutdown
> 
>
> Key: KAFKA-15948
> URL: https://issues.apache.org/jira/browse/KAFKA-15948
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Major
>
> Upon closing we need a round trip from the network thread to the application 
> thread and then back to the network thread to complete the callback 
> invocation.  Currently, we don't have any of that.  I think we need to 
> refactor our closing mechanism.  There are a few points to the refactor:
>  # The network thread should know if there's a custom user callback to 
> trigger or not.  If there is, it should wait for the callback completion to 
> send a leave group.  If not, it should proceed with the shutdown.
>  # The application thread sends a closing signal to the network thread and 
> continuously polls the background event handler until time runs out.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown

2023-11-29 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15948:
--

 Summary: Refactor AsyncKafkaConsumer shutdown
 Key: KAFKA-15948
 URL: https://issues.apache.org/jira/browse/KAFKA-15948
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


Upon closing we need a round trip from the network thread to the application 
thread and then back to the network thread to complete the callback invocation. 
 Currently, we don't have any of that.  I think we need to refactor our closing 
mechanism.  There are a few points to the refactor:
 # The network thread should know if there's a custom user callback to trigger 
or not.  If there is, it should wait for the callback completion to send a 
leave group.  If not, it should proceed with the shutdown.
 # The application thread sends a closing signal to the network thread and 
continuously polls the background event handler until time runs out.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-11-29 Thread via GitHub


jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1409765577


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   This made sense to me before, but I'm trying to remember why.  
   
   Is it possible that when we are building and sending the response we could 
receive the callback request and that means it will be handled on a different 
thread? Vs when acks=all, we free the thread after sending out the 
verification, so it can be available to receive the callback?



##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
 assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+val numRecords = 50
+val numProducersWithCompression = 5
+val numTransactions = 40
+val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+for (i <- 0 until numProducersWithCompression) {
+  transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+}
+
+// KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   This made sense to me before, but I'm trying to remember why.  
   
   Is it possible that when we are building and sending the response we could 
receive the callback request and that means it will be handled on a different 
thread? Vs when acks=all, we free the thread after sending out the 
verification, so it can be available to receive the callback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6

2023-11-29 Thread Ludo (Jira)
Ludo created KAFKA-15947:


 Summary: Null pointer on LZ4 compression since Kafka 3.6
 Key: KAFKA-15947
 URL: https://issues.apache.org/jira/browse/KAFKA-15947
 Project: Kafka
  Issue Type: Bug
  Components: compression
Affects Versions: 3.6.0
Reporter: Ludo


I have a Kafka Stream application running well since month using client version 
{{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} using{{ 
compression.type: "lz4"}}

I've recently updated a my kafka server to kafka 3.6 (bitnami image: 
{{{}bitnami/kafka:3.6.0-debian-11-r0){}}}.
 
The startup is working well for days, and after some time, Kafka Stream crash 
and Kafka output a lot of NullPointerException on the console: 
 
{code:java}
org.apache.kafka.common.KafkaException: java.lang.NullPointerException: Cannot 
invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is 
null
at 
org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
at 
kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754)
at 
kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke 
"java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132)
... 25 more {code}
At the same time the Kafka Stream raise this error:

 
{code:java}
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic kestra_workertaskresult for task 3_6 due 
to:org.apache.kafka.common.errors.UnknownServerException: The server 
experienced an unexpected error when processing the request.Written offsets 
would not be recorded and no more records would be sent since this is a fatal 
error.at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at
 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at
 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at
 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at
 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)at
 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)at
 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:772)at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:757)at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:709)at 

Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]

2023-11-29 Thread via GitHub


rondagostino commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409696104


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -59,10 +59,14 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace, 
Option(config.get.interBrokerProtocolVersionString))
+  val metadataVersion = getMetadataVersion(namespace,
+
Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
   if (!metadataVersion.isKRaftSupported) {
 throw new TerseFailure(s"Must specify a valid KRaft metadata 
version of at least 3.0.")
   }
+  if (!metadataVersion.isProduction()) {
+throw new TerseFailure(s"Metadata version ${metadataVersion} is 
not ready for production use yet.")
+  }

Review Comment:
   Do we need an override flag/option, do you think?



##
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##
@@ -386,4 +418,21 @@ public void testOffsetCommitValueVersion(MetadataVersion 
metadataVersion) {
 public void 
testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion 
metadataVersion) {
 assertEquals((short) 1, 
metadataVersion.offsetCommitValueVersion(true));
 }
+
+@Test
+public void assertLatestProductionIsLessThanLatest() {
+assertTrue(LATEST_PRODUCTION.ordinal() < 
MetadataVersion.latest().ordinal(),
+"Expected LATEST_PRODUCTION " + LATEST_PRODUCTION +
+" to be less than the latest of " + MetadataVersion.latest());
+}

Review Comment:
   Will this always be true?  It seems that if we want to make the latest one 
production-ready then we will have to add a new MetadataVersion after it that 
isn't.  Is this what we want?



##
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##
@@ -42,13 +42,13 @@
   "about": "The epoch of the partition leader." },
 { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
   "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+{ "name": "Directories", "type": "[]uuid", "versions": "1+",
+  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
 { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+  "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,

Review Comment:
   `s/nullableVersions": "1+",/nullableVersions": "2+",/` (unless this is done 
this way on purpose, in which case could you explain why to help me understand?)



##
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##
@@ -329,19 +336,44 @@ public void 
testIsDelegationTokenSupported(MetadataVersion metadataVersion) {
 @ParameterizedTest
 @EnumSource(value = MetadataVersion.class)
 public void testIsElrSupported(MetadataVersion metadataVersion) {
-assertEquals(metadataVersion.equals(IBP_3_7_IV1),
-metadataVersion.isElrSupported());
-short expectPartitionRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-assertEquals(expectPartitionRecordVersion, 
metadataVersion.partitionRecordVersion());
-short expectPartitionChangeRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-assertEquals(expectPartitionChangeRecordVersion, 
metadataVersion.partitionChangeRecordVersion());
+assertEquals(metadataVersion.equals(IBP_3_7_IV3), 
metadataVersion.isElrSupported());

Review Comment:
   `s/metadataVersion.equals/metadataVersion.isAtLeast`
   
   We should probably add a method 
`testIsDirectoryAssignmentSupported(MetadataVersion metadataVersion)`



##
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##
@@ -42,13 +42,13 @@
   "about": "The epoch of the partition leader." },
 { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
   "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+{ "name": "Directories", "type": "[]uuid", "versions": "1+",
+  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
 { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+  "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,
   

Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on PR #14843:
URL: https://github.com/apache/kafka/pull/14843#issuecomment-1832520113

   Thanks @mjsax for reviewing, yes the code is more of wiring. 
@AndrewJSchofield @philipnee @kirktrue can you please help with you feedback 
for defining the class/method in KIP, and review will be helpful too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1409739909


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -160,6 +164,16 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
Metrics metrics,
String metricGrpPrefix,
Time time) {
+this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, 
time, Optional.empty());
+}
+
+public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
+LogContext logContext,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-29 Thread via GitHub


junrao commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1409739363


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   @soarez : Thanks for the explanation. You are right that `KafkaEventQueue` 
does de-duping and only allows one outstanding `CommunicationEvent` in the 
queue. But it seems that duplicated `HeartbeatRequest`s could still be 
generated, which is causing the original transient failure. 
`CommunicationEvent` calls `sendBrokerHeartbeat` that calls the following.
   
   `_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler)
   `
   
   The problem is that we have another queue in 
`NodeToControllerChannelManagerImpl` that doesn't do the de-duping. Once a 
`CommunicationEvent` is dequeued from `KafkaEventQueue`, a `HeartbeatRequest` 
will be queued in `NodeToControllerChannelManagerImpl`. At this point, another 
`CommunicationEvent` could be enqueued in `KafkaEventQueue`. When it's 
processed, another `HeartbeatRequest` will be queued in 
`NodeToControllerChannelManagerImpl`. 
   
   This probably won't introduce long lasting duplicated `HeartbeatRequest` in 
practice since `CommunicationEvent` is typically queued in `KafkaEventQueue`  
for heartbeat interval. By that time, other pending `HeartbeatRequest`s will be 
processed and de-duped when enqueuing to `KafkaEventQueue`. But, maybe we could 
file a jira to track it.
   
   For the test, could we add a comment to explain why we need to wait for 10 
`HeartbeatRequest`s?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of autoretry

2023-11-29 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15946:
---
Labels: kip-848-client-support  (was: )

> AsyncKafkaConsumer should retry commits on the application thread instead of 
> autoretry
> --
>
> Key: KAFKA-15946
> URL: https://issues.apache.org/jira/browse/KAFKA-15946
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> The original design was that the network thread always completes the future 
> whether succeeds or fails.  However, in the current patch, I mis-added 
> auto-retry functionality because commitSync wasn't retrying.  What we should 
> be doing is, the commit sync API should catch the RetriableExceptions and 
> resend another commit until timesout.
>  
> {code:java}
> if (error.exception() instanceof RetriableException) {
> log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, 
> error.message());
> handleRetriableError(error, response);
> retry(responseTime);  <--- We probably shouldn't do this.
> return;
> } {code}
>  
> {code:java}
> @Override
> public void commitSync(Map offsets, 
> Duration timeout) {
> acquireAndEnsureOpen();
> long commitStart = time.nanoseconds();
> try
> { CompletableFuture commitFuture = commit(offsets, true); <-- we 
> probably should retry here ConsumerUtils.getResult(commitFuture, 
> time.timer(timeout)); }
> finally
> { wakeupTrigger.clearTask(); 
> kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); 
> release(); }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >