Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
mjsax commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1410274394 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; Review Comment: Ah. Thanks! -- I mixed it up with the `group.instance.id` config... My bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
mjsax commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1410274394 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; Review Comment: Ah. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
mjsax commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1410273486 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String GROUP_MEMBER_ID = "group_member_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; + +private static final String PRODUCER_NAMESPACE = "kafka.producer"; +private static final String CONSUMER_NAMESPACE = "kafka.consumer"; + +private static final Map PRODUCER_CONFIG_MAPPING = new HashMap<>(); +private static final Map CONSUMER_CONFIG_MAPPING = new HashMap<>(); + +private volatile Resource resource = null; +private Map config = null; + +static { +PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); Review Comment: Might be worth to add a comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield closed pull request #14811: KAFKA-15831: KIP-1000 protocol and admin client URL: https://github.com/apache/kafka/pull/14811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410260721 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +480,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Frankly, given that `fetchDeadline` might be modified (and pushed into the future) by a second call to `KafkaStreams#clientInstanceIds(...)` while the first was not completed yet, it seems we would need `synchronized` in addition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410258175 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: I like to think so :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14864: URL: https://github.com/apache/kafka/pull/14864#issuecomment-1833242672 @AndrewJSchofield -- updated this PR to cover more cases. Still not complete, but more review input is welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address if multiple ad… [kafka]
ijuma commented on PR #14813: URL: https://github.com/apache/kafka/pull/14813#issuecomment-1833215345 @lucasbru Did you cherry-pick to 3.6? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on PR #14843: URL: https://github.com/apache/kafka/pull/14843#issuecomment-1833203010 Hi @kirktrue, thanks for taking time out to review and leaving feedback. I have addressed the feedback, can you please re-review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1410224871 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -361,6 +370,9 @@ private void cancelInFlightRequests(String nodeId, } } else if (request.header.apiKey() == ApiKeys.METADATA) { metadataUpdater.handleFailedRequest(now, Optional.empty()); +} else if ((request.header.apiKey() == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || +request.header.apiKey() == ApiKeys.PUSH_TELEMETRY) && telemetrySender != null) { +telemetrySender.handleFailedRequest(request.header.apiKey(), null); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1410198452 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4387,7 +4398,52 @@ public FenceProducersResult fenceProducers(Collection transactionalIds, @Override public Uuid clientInstanceId(Duration timeout) { -throw new UnsupportedOperationException(); +if (timeout.isNegative()) { +throw new IllegalArgumentException("The timeout cannot be negative."); +} + +if (!clientTelemetryEnabled) { +throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`."); +} + +if (clientInstanceId != null) { +return clientInstanceId; +} + +final long now = time.milliseconds(); +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()), +new LeastLoadedNodeProvider()) { + +@Override +GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) { +return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse; +if (response.error() != Errors.NONE) { +future.completeExceptionally(response.error().exception()); +} else { +future.complete(response.data().clientInstanceId()); +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}, now); + +try { +clientInstanceId = future.get(); +} catch (Exception e) { +log.error("Error occurred while fetching client instance id", e); +throw new KafkaException("Error occurred while fetching client instance id", e); +} Review Comment: The other 2 classes loads the subscription as part of GetTelemetrySubscriptions API call and as well runs the continuous poll loop to see any change in state to re-fetch subscription, which can work with signalling of loaded subscription. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1410158203 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +if (ClientMetricsReceiverPlugin.instance.isEmpty) { + info("Received get telemetry client request, no metrics receiver plugin configured or running with ZK") + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => +subscriptionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)) Review Comment: Thanks for the quick reply @junrao, I am looking into this. I thought the suggestion was to have a flag similar to `zkMigrationEnabled`, but it looks like to not emit APIKeys of telemetry if plugin is missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1410157251 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3747,16 +3747,56 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +clientMetricsManager match { + case Some(metricsManager) => +try { + if (metricsManager.isTelemetryReceiverConfigured) { +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => Review Comment: Thanks @junrao , done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
junrao commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1410140939 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +if (ClientMetricsReceiverPlugin.instance.isEmpty) { + info("Received get telemetry client request, no metrics receiver plugin configured or running with ZK") + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => +subscriptionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)) Review Comment: @apoorvmittal10 : ApiVersionResponse is defined to include all new types of requests supported on the server. So, there is no need to have a separate KIP. There is no need to add clientTelemetryEnabled in ApiVersionResponse. If clientTelemetryEnabled is false on the server, we can just exclude telemetry related API keys from ApiVersionResponse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
Lucent-Wong commented on PR #14666: URL: https://github.com/apache/kafka/pull/14666#issuecomment-1833071663 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1410123451 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.Uuid; + +import java.time.Duration; +import java.util.Optional; + +public class ClientTelemetryUtils { Review Comment: Thanks @kirktrue for the feedback. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1410120896 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +if (ClientMetricsReceiverPlugin.instance.isEmpty) { + info("Received get telemetry client request, no metrics receiver plugin configured or running with ZK") + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => +subscriptionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)) Review Comment: @junrao Thanks for the suggestion. Just want to clarify, does adding `clientTelemetryEnabled` in `apiVersionResponse` requires another KIP or we can add that in KIP-714 or nothing to be documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Source task stop call was added to force stopping execution. [kafka]
github-actions[bot] commented on PR #14101: URL: https://github.com/apache/kafka/pull/14101#issuecomment-1833053201 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1833016137 @mjsax I have also moved ClientTelemetryUtils to `internals` package in this PR as well, the class is new and have changes in both PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410046365 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: I set back the default settings and it seems to trigger consistently again anyway. Maybe I just tricked myself into thinking I needed the other configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410043469 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Interestingly when I put it back to the typical settings it failed consistently 臘♀️ I guess I tricked myself into thinking i needed the other settings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410043469 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Interestingly when I put it back to the typical settings it failed consistently 臘♀️ I guess I tricked myself into thinking i needed the other settings. ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Interestingly when I put it back to the typical settings it failed consistently 臘♀️ I guess I tricked myself into thinking i needed the other settings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion [kafka]
arunmathew88 commented on PR #11438: URL: https://github.com/apache/kafka/pull/11438#issuecomment-1832945607 @divijvaidya A gentle nudge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1410038816 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @junrao : That's a good point, you're right. In between `HeartbeatRequest` being sent and the response being handled, `propagateDirectoryFailure` could be called, immediately scheduling a `HeartbeatRequest`, causing an extra request. It looks like this was already the case with `setReadyToUnfence()` and `beginControlledShutdown()`, which can also cause an extra request in the same way. We can avoid the extra requests by checking - in `OfflineDirEvent.run`, `SetReadyToUnfenceEvent.run` and `BeginControlledShutdownEvent.run` - whether a request is inflight, and delaying calling `scheduleNextCommunicationImmediately` until after the response is received. Please see #14874 about there explaining comment for the test. I also see you've filed [KAFKA-15950](https://issues.apache.org/jira/browse/KAFKA-15950). Thanks, I'll have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
hachikuji commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410034374 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Yeah, what I'm saying is that this test seems more useful as a general validation of transactional produce with compression. From that perspective, the default settings are the most useful to test. For KAFKA-15653, the lower level tests seem sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix flaky `MetadataLoaderTest.testNoPublishEmptyImage` [kafka]
hachikuji opened a new pull request, #14875: URL: https://github.com/apache/kafka/pull/14875 There is a race in the assertion on `capturedImages`. Since the future is signaled first, it is still possible to see an empty list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1410029648 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } + + if (currentOffset >= currentHighWatermark) { +onLoadedBatch.accept(currentOffset) + } Review Comment: you're right, we return ``` // Required offset. partitionResult.lastOffset + 1 ``` in CoordinatorPartitionWriter#append -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Comment explaining test approach [kafka]
soarez opened a new pull request, #14874: URL: https://github.com/apache/kafka/pull/14874 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: No need for response callback when applying controller mutation throttle [kafka]
hachikuji merged PR #14861: URL: https://github.com/apache/kafka/pull/14861 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: No need for response callback when applying controller mutation throttle [kafka]
hachikuji commented on PR #14861: URL: https://github.com/apache/kafka/pull/14861#issuecomment-1832915608 All failures seem to be flaky. I tested them locally. I will merge to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410014399 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: I didn't see it fail much at all with the default settings. It was like once in 60 times or less which was not super useful. There are lower level tests for the callback using the correct requestLocal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
hachikuji commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1410013739 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: My take is that this test should probably just test compression of transactional data with the default settings. I don't think it necessarily needs to hit the case from KAFKA-15653. It seems like it doesn't do so reliably anyway. Are there lower level tests where we can validate callback safety? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction
[ https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791367#comment-17791367 ] Jun Rao commented on KAFKA-15950: - cc [~soarez] > CommunicationEvent should be scheduled with EarliestDeadlineFunction > > > Key: KAFKA-15950 > URL: https://issues.apache.org/jira/browse/KAFKA-15950 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Jun Rao >Priority: Major > > Currently, CommunicationEvent is scheduled with DeadlineFunction, which > ignores the schedule time for an existing event. This wasn't an issue when > CommunicationEvent is always periodic. However, with KAFKA-15360, a > CommunicationEvent could be scheduled immediately for offline dirs. If a > periodic CommunicationEvent is scheduled after the immediate > CommunicationEvent in KafkaEventQueue, the former will cancel the latter, but > leaves the schedule time to be periodic. This will unnecessarily delay the > communication of the failed dir to the controller. > > Using EarliestDeadlineFunction will fix this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction
Jun Rao created KAFKA-15950: --- Summary: CommunicationEvent should be scheduled with EarliestDeadlineFunction Key: KAFKA-15950 URL: https://issues.apache.org/jira/browse/KAFKA-15950 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.7.0 Reporter: Jun Rao Currently, CommunicationEvent is scheduled with DeadlineFunction, which ignores the schedule time for an existing event. This wasn't an issue when CommunicationEvent is always periodic. However, with KAFKA-15360, a CommunicationEvent could be scheduled immediately for offline dirs. If a periodic CommunicationEvent is scheduled after the immediate CommunicationEvent in KafkaEventQueue, the former will cancel the latter, but leaves the schedule time to be periodic. This will unnecessarily delay the communication of the failed dir to the controller. Using EarliestDeadlineFunction will fix this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15941: Labels: flaky-test (was: ) > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15941: Component/s: streams unit tests > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15944) Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] – org.apache.kafka.streams.integration.PositionRestartIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15944: Component/s: unit tests > Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] > – org.apache.kafka.streams.integration.PositionRestartIntegrationTest > -- > > Key: KAFKA-15944 > URL: https://issues.apache.org/jira/browse/KAFKA-15944 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Priority: Minor > Labels: flaky-test > > Error > org.apache.kafka.common.errors.TimeoutException: The query never returned > within the bound. Last result: > StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4, > executionInfo=[], position=Position{position={input-topic={0=1, > 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested > partition was not present at the time of the query.', executionInfo=[], > position=null}}, globalResult=null} > Stacktrace > org.apache.kafka.common.errors.TimeoutException: The query never returned > within the bound. Last result: > StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4, > executionInfo=[], position=Position{position={input-topic={0=1, > 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested > partition was not present at the time of the query.', executionInfo=[], > position=null}}, globalResult=null} > Standard Output > [2023-11-28 22:52:47,353] INFO [Producer clientId=producer-129] Instantiated > an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer:587) > [2023-11-28 22:52:47,466] INFO [Producer clientId=producer-129] ProducerId > set to 0 with epoch 0 > (org.apache.kafka.clients.producer.internals.TransactionManager:502) > [2023-11-28 22:52:47,473] INFO [Producer clientId=producer-129] Closing the > Kafka producer with timeoutMillis = 9223372036854775807 ms. > (org.apache.kafka.clients.producer.KafkaProducer:1332) > [2023-11-28 22:52:47,531] INFO stream-client > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57] > Kafka Streams version: test-version > (org.apache.kafka.streams.KafkaStreams:914) > [2023-11-28 22:52:47,531] INFO stream-client > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57] > Kafka Streams commit ID: test-commit-ID > (org.apache.kafka.streams.KafkaStreams:915) > [2023-11-28 22:52:47,532] INFO stream-thread > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1] > Creating restore consumer client > (org.apache.kafka.streams.processor.internals.StreamThread:365) > [2023-11-28 22:52:47,537] INFO stream-thread > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1] > Creating thread producer client > (org.apache.kafka.streams.processor.internals.StreamThread:105) > [2023-11-28 22:52:47,538] INFO [Producer > clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1-producer] > Instantiated an idempotent producer. > (org.apache.kafka.clients.producer.KafkaProducer:587) > [2023-11-28 22:52:47,545] INFO stream-thread > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1] > Creating consumer client > (org.apache.kafka.streams.processor.internals.StreamThread:432) > [2023-11-28 22:52:47,545] INFO state-updater > [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StateUpdater-1] > State updater thread started > (org.apache.kafka.streams.processor.internals.DefaultStateUpdater:135) > [2023-11-28 22:52:47,547] INFO [Producer > clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1-producer] > ProducerId set to 1 with epoch 0 > (org.apache.kafka.clients.producer.internals.TransactionManager:502) > [2023-11-28 22:52:47,550] INFO stream-thread >
[jira] [Resolved] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration
[ https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-15311. -- Fix Version/s: 3.7.0 Resolution: Fixed > Fix docs about reverting to ZooKeeper mode during KRaft migration > - > > Key: KAFKA-15311 > URL: https://issues.apache.org/jira/browse/KAFKA-15311 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > Fix For: 3.7.0 > > > The cocs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]
cmccabe commented on PR #14546: URL: https://github.com/apache/kafka/pull/14546#issuecomment-1832839662 I'm going to close this for now since it is superseded by #14160. If you find more stuff that should be fixed, please do open a new one! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]
cmccabe closed pull request #14546: MINOR: Zk to KRaft migration is now production ready URL: https://github.com/apache/kafka/pull/14546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1409963570 ## metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.placement; + +import org.apache.kafka.common.Uuid; + +/** + * Provide the default directory for new partitions in a given broker. + */ +@FunctionalInterface +public interface DefaultDirProvider { +Uuid defaultDir(int brokerId); Review Comment: > The caller needs to handle it regarldless. If there's anything other than an empty list set in directories, serializing the record in a MV that does not yet support dir assignment triggers a UnsupportedVersionException with "Attempted to write a non-default directories at version ...". In what I was proposing, the caller doesn't need to handle that, though, since it's not supposed to happen. > I agree, I would rather not have to configure PartitionChangeBuilder with a DefaultDirProvider. But maybe I'm not quite picturing what you are suggesting. Well, I was suggesting that the MV stuff be handled in the DefaultDirProvider rather than in PartitionChangeBuilder. But I guess the latter is already handling MV, so there isn't as much to be gained there. Fair enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409942673 ## server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java: ## @@ -386,4 +418,21 @@ public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) { public void testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion metadataVersion) { assertEquals((short) 1, metadataVersion.offsetCommitValueVersion(true)); } + +@Test +public void assertLatestProductionIsLessThanLatest() { +assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latest().ordinal(), +"Expected LATEST_PRODUCTION " + LATEST_PRODUCTION + +" to be less than the latest of " + MetadataVersion.latest()); +} Review Comment: yes. there should always be a new unstable version for the new stuff to go in :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409942437 ## server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java: ## @@ -329,19 +336,44 @@ public void testIsDelegationTokenSupported(MetadataVersion metadataVersion) { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testIsElrSupported(MetadataVersion metadataVersion) { -assertEquals(metadataVersion.equals(IBP_3_7_IV1), -metadataVersion.isElrSupported()); -short expectPartitionRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; -assertEquals(expectPartitionRecordVersion, metadataVersion.partitionRecordVersion()); -short expectPartitionChangeRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; -assertEquals(expectPartitionChangeRecordVersion, metadataVersion.partitionChangeRecordVersion()); +assertEquals(metadataVersion.equals(IBP_3_7_IV3), metadataVersion.isElrSupported()); Review Comment: yes to both -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409941745 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -207,6 +213,22 @@ public enum MetadataVersion { */ public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = IBP_3_3_IV0; +/** + * The latest production-ready MetadataVersion. This is the latest version that is stable + * and cannot be changed. MetadataVersions later than this can be tested via junit, but + * not deployed in production. + * + * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, + * IT CANNOT BE CHANGED. + */ +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0; Review Comment: yes. once it's production we'll change this (could be with a MINOR or with a JIRA). And then we can't change it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409941250 ## metadata/src/main/resources/common/metadata/PartitionRecord.json: ## @@ -42,13 +42,13 @@ "about": "The epoch of the partition leader." }, { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change anything in the partition." }, +{ "name": "Directories", "type": "[]uuid", "versions": "1+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1, + "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", "tag": 1, Review Comment: fixed ## metadata/src/main/resources/common/metadata/PartitionRecord.json: ## @@ -42,13 +42,13 @@ "about": "The epoch of the partition leader." }, { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change anything in the partition." }, +{ "name": "Directories", "type": "[]uuid", "versions": "1+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1, + "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", "tag": 1, "about": "The eligible leader replicas of this partition." }, { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2, - "about": "The last known eligible leader replicas of this partition." }, -{ "name": "Directories", "type": "[]uuid", "versions": "2+", - "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."} + "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", "tag": 2, Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409938934 ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -2191,7 +2191,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV1, + interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV2, Review Comment: Wanted a JBOD version here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15949) Improve the KRaft metadata version related messages
Jakub Scholz created KAFKA-15949: Summary: Improve the KRaft metadata version related messages Key: KAFKA-15949 URL: https://issues.apache.org/jira/browse/KAFKA-15949 Project: Kafka Issue Type: Improvement Affects Versions: 3.6.0 Reporter: Jakub Scholz Various error messages related to KRaft seem to use very different style and formatting. Just for example in the {{StorageTool}} Scala class, there are two different examples: * {{Must specify a valid KRaft metadata version of at least 3.0.}} ** Refers to "metadata version" ** Refers to the version as 3.0 (although strictly speaking 3.0-IV0 is not valid for KRaft) * {{SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.}} ** Talks about "metadataVersion" ** Refers to "IBP_3_5_IV2" instead of "3.5" or "3.5-IV2" Other pieces of Kafka code seem to also talk about "metadata.version" for example. For users, it would be nice if the style and formats used were the same everywhere. Would it be worth unifying messages like this? If yes, what would be the preferred style to use? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791354#comment-17791354 ] Haruki Okada commented on KAFKA-9693: - [~paolomoriello] [~novosibman] Hi, I believe the latency spike due to flushing on log.roll is now resolved by https://issues.apache.org/jira/browse/KAFKA-15046 > Kafka latency spikes caused by log segment flush on roll > > > Key: KAFKA-9693 > URL: https://issues.apache.org/jira/browse/KAFKA-9693 > Project: Kafka > Issue Type: Improvement > Components: core > Environment: OS: Amazon Linux 2 > Kafka version: 2.2.1 >Reporter: Paolo Moriello >Assignee: Paolo Moriello >Priority: Major > Labels: Performance, latency, performance > Fix For: 3.7.0 > > Attachments: image-2020-03-10-13-17-34-618.png, > image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, > image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, > image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, > image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, > latency_plot2.png > > > h1. Summary > When a log segment fills up, Kafka rolls over onto a new active segment and > force the flush of the old segment to disk. When this happens, log segment > _append_ duration increase causing important latency spikes on producer(s) > and replica(s). This ticket aims to highlight the problem and propose a > simple mitigation: add a new configuration to enable/disable rolled segment > flush. > h1. 1. Phenomenon > Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to > ~50x-200x more than usual. For instance, normally 99th %ile is lower than > 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th > %iles even jump to 500-700ms. > Latency spikes happen at constant frequency (depending on the input > throughput), for small amounts of time. All the producers experience a > latency increase at the same time. > h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314! > {{Example of response time plot observed during on a single producer.}} > URPs rarely appear in correspondence of the latency spikes too. This is > harder to reproduce, but from time to time it is possible to see a few > partitions going out of sync in correspondence of a spike. > h1. 2. Experiment > h2. 2.1 Setup > Kafka cluster hosted on AWS EC2 instances. > h4. Cluster > * 15 Kafka brokers: (EC2 m5.4xlarge) > ** Disk: 1100Gb EBS volumes (4750Mbps) > ** Network: 10 Gbps > ** CPU: 16 Intel Xeon Platinum 8000 > ** Memory: 64Gb > * 3 Zookeeper nodes: m5.large > * 6 producers on 6 EC2 instances in the same region > * 1 topic, 90 partitions - replication factor=3 > h4. Broker config > Relevant configurations: > {quote}num.io.threads=8 > num.replica.fetchers=2 > offsets.topic.replication.factor=3 > num.network.threads=5 > num.recovery.threads.per.data.dir=2 > min.insync.replicas=2 > num.partitions=1 > {quote} > h4. Perf Test > * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per > broker) > * record size = 2 > * Acks = 1, linger.ms = 1, compression.type = none > * Test duration: ~20/30min > h2. 2.2 Analysis > Our analysis showed an high +correlation between log segment flush count/rate > and the latency spikes+. This indicates that the spikes in max latency are > related to Kafka behavior on rolling over new segments. > The other metrics did not show any relevant impact on any hardware component > of the cluster, eg. cpu, memory, network traffic, disk throughput... > > !latency_plot2.png|width=924,height=308! > {{Correlation between latency spikes and log segment flush count. p50, p95, > p99, p999 and p latencies (left axis, ns) and the flush #count (right > axis, stepping blue line in plot).}} > Kafka schedules logs flushing (this includes flushing the file record > containing log entries, the offset index, the timestamp index and the > transaction index) during _roll_ operations. A log is rolled over onto a new > empty log when: > * the log segment is full > * the maxtime has elapsed since the timestamp of first message in the > segment (or, in absence of it, since the create time) > * the index is full > In this case, the increase in latency happens on _append_ of a new message > set to the active segment of the log. This is a synchronous operation which > therefore blocks producers requests, causing the latency increase. > To confirm this, I instrumented Kafka to measure the duration of > FileRecords.append(MemoryRecords) method, which is responsible of writing > memory records to file. As a result, I observed the same spiky pattern as in > the producer latency, with a one-to-one
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
junrao commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1409930152 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +if (ClientMetricsReceiverPlugin.instance.isEmpty) { + info("Received get telemetry client request, no metrics receiver plugin configured or running with ZK") + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => +subscriptionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)) Review Comment: Yes, I agree that we need to exclude telemetry requests in `APIVersionsResponse` if ClientMetrics Exporter Plugin is not configured. We could do that by passing in the plugin availability flag through `ApiVersionManager.apiVersionResponse`. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3747,16 +3747,56 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +clientMetricsManager match { + case Some(metricsManager) => +try { + if (metricsManager.isTelemetryReceiverConfigured) { +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => Review Comment: If we use `sendResponseMaybeThrottle`, we need to set requestThrottleMs in the response. A simpler approach is to use `sendMaybeThrottle`, which automatically sets `requestThrottleMs` in the response. Ditto in other usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]
gharris1727 commented on PR #14795: URL: https://github.com/apache/kafka/pull/14795#issuecomment-1832781742 Also worth a follow-up is maybe tackling the outliers that do experience truncation, and try to reduce their log volume to a more reasonable level. The biggest offenders above 10MB appear to be: Size | Test -- | -- 153.56MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldWaitForMissingInputTopicsToBeCreated() 142.97MB | org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=false] 103.84MB | org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled 83.26MB | org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.testThreadSafety 82.94MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() 66.77MB | kafka.coordinator.group.GroupCoordinatorConcurrencyTest.testConcurrentRandomSequence() 66.68MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies() 45.17MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning() 41.76MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() 39.36MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() 34.26MB | org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=true] 29.29MB | org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorLargeNumConsumers 28.11MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowPatternSubscriptionWithMultipleNamedTopologies() 24.25MB | org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful()[6] 22.99MB | org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSourceConnectorOffsetsExactlyOnceSupportEnabled 21.63MB | org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testAlterLogDirReassignmentThrottle(String)[1] 20.61MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldBackOffTaskAndEmitDataWithinSameTopology() 20.39MB | org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testLogDirReassignment(String)[1] 19.17MB | org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest.shouldThrowErrorAfterSourceTopicDeleted 17.75MB | org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful()[2] 17.32MB | org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator 16.30MB | org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful()[2] 15.47MB | org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testCancellation(String)[1] 13.77MB | org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testThrottledReassignment(String)[1] 12.64MB | org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest.testCancellation(String)[2] 12.46MB | org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful()[6] 10.22MB | org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]
nizhikov commented on PR #14588: URL: https://github.com/apache/kafka/pull/14588#issuecomment-1832765723 Hello @jolshan are you ready to merge this? Can I improve this PR somehow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]
gharris1727 commented on PR #14795: URL: https://github.com/apache/kafka/pull/14795#issuecomment-1832757011 I did some statistics on the current state of log truncation in CI. I learned that: * A full run of `./gradlew test` writes 1.73GB of logs * 814 of 1532 test suites (53%) produce 0 logs For the existing suite-level truncation: * 1.71GB (98%) of these logs are discarded due to truncation * 27MB (1.5%) of these logs are kept after truncation * 175 of 1532 test suites (11%) experience truncation * 5721 of 26298 tests (22%) are in test suites that experience truncation * Test suites which produce logs average 37kb of logs after truncation With the test-level truncation proposed here: * 1.61GB (93%) of these logs are discarded due to test-level truncation * 126MB (7%) of these logs are kept after truncation * 452 of 26298 tests (1.7%) experience truncation * Tests which produce logs average 15kb of logs after truncation So, assuming a worst-case run with every test failing (as logs are only kept for failed tests) and log volume similar to successful runs, this change would cost 5 times (126MB/27MB) as much log storage space. However, any particular test would be **~12 times (5721/452) less likely to experience truncation**. We don't regularly see fully-failed test suites, and instead typically see small numbers of test failures. If we assume test failures to be uniformly distributed among all tests (which they almost certainly aren't, but I don't have statistics for that) we can use averages to calculate the expected persisted logs per test failure. Since a test failure in suite-truncation keeps the truncated logs for the whole suite, each test failure adds on average 37kb of logs, or less if multiple tests in the same suite fail. Test failures under test-truncation only keep logs for the individual tests, which averages 15kb, and receives no discount for multiple test failures. So assuming a small number of test failures which is typical, **the cost of storing these logs is 2 times (37kb/15kb) less**. I believe that this change should not be harmful to the Jenkins test infrastructure, and will immediately benefit our ability to debug tests via CI, especially flaky failures. @ijuma @mimaison @divijvaidya Could you take a look at this when you have a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ludo resolved KAFKA-15947. -- Fix Version/s: 3.6.1 Resolution: Duplicate > Null pointer on LZ4 compression since Kafka 3.6 > --- > > Key: KAFKA-15947 > URL: https://issues.apache.org/jira/browse/KAFKA-15947 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 3.6.0 >Reporter: Ludo >Priority: Major > Fix For: 3.6.1 > > > I have a Kafka Stream application running well since month using client > version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} > using{{ compression.type: "lz4"}} > I've recently updated a my kafka server to kafka 3.6 (bitnami image: > {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}. > > The startup is working well for days, and after some time, Kafka Stream crash > and Kafka output a lot of NullPointerException on the console: > > {code:java} > org.apache.kafka.common.KafkaException: java.lang.NullPointerException: > Cannot invoke "java.nio.ByteBuffer.hasArray()" because > "this.intermediateBufRef" is null > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132) > ... 25 more {code} > At the same time the Kafka Stream raise this error: > > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic kestra_workertaskresult for task 3_6 due > to:org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.Written offsets > would not be recorded and no more records would be sent since this is a fatal > error.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at > > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at > > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)at > >
[jira] [Commented] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791343#comment-17791343 ] Ludo commented on KAFKA-15947: -- Yes seems the same, and yes I use transaction (with Kafka Streams). thanks for highlight this issues. > Null pointer on LZ4 compression since Kafka 3.6 > --- > > Key: KAFKA-15947 > URL: https://issues.apache.org/jira/browse/KAFKA-15947 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 3.6.0 >Reporter: Ludo >Priority: Major > > I have a Kafka Stream application running well since month using client > version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} > using{{ compression.type: "lz4"}} > I've recently updated a my kafka server to kafka 3.6 (bitnami image: > {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}. > > The startup is working well for days, and after some time, Kafka Stream crash > and Kafka output a lot of NullPointerException on the console: > > {code:java} > org.apache.kafka.common.KafkaException: java.lang.NullPointerException: > Cannot invoke "java.nio.ByteBuffer.hasArray()" because > "this.intermediateBufRef" is null > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132) > ... 25 more {code} > At the same time the Kafka Stream raise this error: > > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic kestra_workertaskresult for task 3_6 due > to:org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.Written offsets > would not be recorded and no more records would be sent since this is a fatal > error.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at > > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at > >
[jira] [Comment Edited] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791332#comment-17791332 ] Justine Olshan edited comment on KAFKA-15947 at 11/29/23 9:41 PM: -- Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? Are you running with transactions? The above ticket will be fixed in 3.6.1 was (Author: jolshan): Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? Are you running with transactions? > Null pointer on LZ4 compression since Kafka 3.6 > --- > > Key: KAFKA-15947 > URL: https://issues.apache.org/jira/browse/KAFKA-15947 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 3.6.0 >Reporter: Ludo >Priority: Major > > I have a Kafka Stream application running well since month using client > version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} > using{{ compression.type: "lz4"}} > I've recently updated a my kafka server to kafka 3.6 (bitnami image: > {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}. > > The startup is working well for days, and after some time, Kafka Stream crash > and Kafka output a lot of NullPointerException on the console: > > {code:java} > org.apache.kafka.common.KafkaException: java.lang.NullPointerException: > Cannot invoke "java.nio.ByteBuffer.hasArray()" because > "this.intermediateBufRef" is null > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132) > ... 25 more {code} > At the same time the Kafka Stream raise this error: > > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic kestra_workertaskresult for task 3_6 due > to:org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.Written offsets > would not be recorded and no more records would be sent since this is a fatal > error.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at > >
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1832745572 In the current implementation, when poll timer expires, the consumer sends a leavegroup request w/o revoking the partitions. This is obviously a bit different to the current state transition, so I'm not sure what's the right thing to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791332#comment-17791332 ] Justine Olshan commented on KAFKA-15947: Hey – is this the same as https://issues.apache.org/jira/browse/KAFKA-15653? Are you running with transactions? > Null pointer on LZ4 compression since Kafka 3.6 > --- > > Key: KAFKA-15947 > URL: https://issues.apache.org/jira/browse/KAFKA-15947 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 3.6.0 >Reporter: Ludo >Priority: Major > > I have a Kafka Stream application running well since month using client > version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} > using{{ compression.type: "lz4"}} > I've recently updated a my kafka server to kafka 3.6 (bitnami image: > {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}. > > The startup is working well for days, and after some time, Kafka Stream crash > and Kafka output a lot of NullPointerException on the console: > > {code:java} > org.apache.kafka.common.KafkaException: java.lang.NullPointerException: > Cannot invoke "java.nio.ByteBuffer.hasArray()" because > "this.intermediateBufRef" is null > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132) > ... 25 more {code} > At the same time the Kafka Stream raise this error: > > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic kestra_workertaskresult for task 3_6 due > to:org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.Written offsets > would not be recorded and no more records would be sent since this is a fatal > error.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at > > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at > >
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409900140 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -59,10 +59,14 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString)) + val metadataVersion = getMetadataVersion(namespace, + Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString)) if (!metadataVersion.isKRaftSupported) { throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.") } + if (!metadataVersion.isProduction()) { +throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.") + } Review Comment: I created an internal configuration `unstable.metadata.versions.enable` to bypass this. Since it is internal and undocumented, it doesn't need a KIP. (Also see the very similar `unstable.api.versions.enable`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409899121 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: What is the exact to do in KIP-848 if the poll timer expires? The current implementation sends a leavegroup request directly without revoking partitions. If we want to do that, we need to modify the state transition a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1409884417 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Sorry I think I got confused about when the response is being sent. I will say that adding more request threads didn't work and I had to lower them to get it to reproduce frequently? I can try it the other way around, but iirc, there were a lot of combinations that were not as consistent as it is now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: Not sure if kip-848 changes this but ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]
cmccabe merged PR #14858: URL: https://github.com/apache/kafka/pull/14858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1409866239 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() { targetAdding = Collections.emptyList(); } -public Optional build() { +public Optional build(DefaultDirProvider defaultDirProvider) { Review Comment: Updated this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
cmccabe commented on PR #14860: URL: https://github.com/apache/kafka/pull/14860#issuecomment-1832698639 > Left a few comments, nothing major directly on the changes. > > I do have a high-level thought to put out there. This currently prevents the storage tool from formatting with a non production MetadataVersion and prevents the feature tool from updating a running cluster to a non-production MetadataVersion. I wonder if this will break anything, and I wonder if perhaps we should add a break-glass mechanism to circumvent these things in case something does break. While this may not require a KIP per-se, I would not be surprised if changing this causes issues for people somewhere. I don't think this will break any existing use-cases. I agree that having a break-glass mechanism would be useful for some testing cases. Although we need to be very sure to document that it shouldn't be used in production. Maybe we can follow up with a KIP later. But it would be too late for 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: Not sure if kip-848 changes this but ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]
cmccabe commented on PR #14858: URL: https://github.com/apache/kafka/pull/14858#issuecomment-1832693965 > The directory fields in BrokerRegistrationRequest, RegisterBrokerRecord, BrokerRegistrationChangeRecord, PartitionRecord and PartitionChangeRecord are all of type Uuid[], which defaults to an empty list. We don't have any directory related fields of type Uuid in requests or metadata records. But I still think it makes sense to change this, because we might need such fields in the future. Yeah. Good points. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1409860115 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterO record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { -if (!DirectoryId.UNASSIGNED.equals(directory)) { +if (!DirectoryId.UNASSIGNED.equals(directory) && !DirectoryId.MIGRATING.equals(directory)) { Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee opened a new pull request, #14873: URL: https://github.com/apache/kafka/pull/14873 Currently, poll interval is not being respected during consumer#poll. We need to make sure the consumer can leave the group actively when the user doesn't poll frequently enough. This PR added 1. a poll timer and 2. a boolean flag notifying joining on subsequent poll. - The poll timer is always configured with rebalanceTimeout - The boolean flag, rejoinOnPoll, is set when the leaveGroup completes. Such as the subsequent poll and rejoin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409851471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java: ## @@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {} * any post unloading operations. */ default void onUnloaded() {} + +/** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ +default void replay(U record) throws RuntimeException {} Review Comment: this doesn't work because CoordinatorShard no longer longer extends CoordinatorPlayback so it doesn't have a replay() method anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1409850375 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } +@Test Review Comment: I put the tests here because they do only apply to the async consumer for the moment. As I wrote in a different comment, moving forward, we should move this tests to the `ConsumerConfig`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1409848052 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { Review Comment: I think moving forward when we will also change the validation of the group ID for the legacy consumer in 4.0 we should write a validator for the group ID config in `ConsumerConfig` throw there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on PR #14872: URL: https://github.com/apache/kafka/pull/14872#issuecomment-1832675480 @kirktrue please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]
cadonna commented on PR #14768: URL: https://github.com/apache/kafka/pull/14768#issuecomment-1832672849 @kirktrue My PR regarding [KAFKA-15281](https://issues.apache.org/jira/browse/KAFKA-15281) overlaps a lot with this. So I broke out the relevant parts for the group ID verification from my PR and opened https://github.com/apache/kafka/pull/14872. That will avoid a lot of merge conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna opened a new pull request, #14872: URL: https://github.com/apache/kafka/pull/14872 Verifies that the group ID passed into the async consumer is valid. That is, if the group ID is not null, it is not empty or it does not consist of only whitespaces. This change stores the group ID in the group metadata because KAFKA-15281 about the group metadata API will build on that. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
soarez commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409826262 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -207,6 +213,22 @@ public enum MetadataVersion { */ public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = IBP_3_3_IV0; +/** + * The latest production-ready MetadataVersion. This is the latest version that is stable + * and cannot be changed. MetadataVersions later than this can be tested via junit, but + * not deployed in production. + * + * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, + * IT CANNOT BE CHANGED. + */ +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0; Review Comment: So I guess we need another JIRA to change this to `IBP_3_7_IV2` before the 3.7 release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409822776 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +480,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Yes, I saw the `volatile`. I agree that the ordering is sufficient to make it safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821677 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821009 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); Review Comment: Comments :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409819732 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: OK, it sounds like you have a plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: DirectoryId.MIGRATING should be all zeros [kafka]
soarez commented on PR #14858: URL: https://github.com/apache/kafka/pull/14858#issuecomment-1832632146 The directory fields in `BrokerRegistrationRequest`, `RegisterBrokerRecord`, `BrokerRegistrationChangeRecord`, `PartitionRecord` and `PartitionChangeRecord` are all of type `Uuid[]`, which defaults to an empty list. We don't have any directory related fields of type `Uuid` in requests or metadata records. But I still think it makes sense to change this, because we might need such fields in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 commented on PR #14729: URL: https://github.com/apache/kafka/pull/14729#issuecomment-1832624782 Hey @cmccabe @hachikuji Could you take a look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
hachikuji commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1409800961 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: Not sure I follow that. Whether there are any replicas or not, wouldn't the request thread will be freed after sending out the verification? One way I could see the case being more likely is if we added additional request threads. I don't think there's any affinity in the callback to the original request thread, is there? So more available request threads probably means a greater chance a separate thread would handle the callback. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on PR #14107: URL: https://github.com/apache/kafka/pull/14107#issuecomment-1832612612 I'm going to start my review of this today. Hopefully can be actionable sometime tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + clientInstanceIdFuture.completeExceptionally(error); +fetchDeadline = -1; Review Comment: Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight waiting for completion. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow Review Comment: Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds()) {` (from above) further below. ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: How strict we can obey the given `timeout` is somewhat tricky, given that we need to call `clientInstanceId()` for each client we have. -- The idea was to basically "fan-out" all these calls and to them in parallel (note that `globalConsumerInstanceId` will return immediately and not block, but hand the execution from the user thread to the `GlobalStreamThread`; that's why we return a Future) -- thus it should be ok to provide the same timeout to each call (as all of them are done in parallel)? If you have any good suggestion how it could be done better, let me know. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); Review Comment: Yes, but this was intentionally. The `GlobalStreamThread` does this call "on the side", and thus the idea is to just call it with no timeout to just trigger the background RPC and not block the thread from doing its actually work at all. -- There won't be a busy wait, because the global thread will do other useful work in the meantime
[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets
[ https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15816: Description: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. was: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. > Typos in tests leak network sockets > --- > > Key: KAFKA-15816 > URL: https://issues.apache.org/jira/browse/KAFKA-15816 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > There are a few tests which leak network sockets due to small typos in the > tests themselves. > Clients: [https://github.com/apache/kafka/pull/14750] > * NioEchoServer > * KafkaConsumerTest > * KafkaProducerTest > * SelectorTest > * SslTransportLayerTest > * SslTransportTls12Tls13Test > * SslVersionsTransportLayerTest > * SaslAuthenticatorTest > Core: [https://github.com/apache/kafka/pull/14754] > * MiniKdc > * GssapiAuthenticationTest > * MirrorMakerIntegrationTest > * SocketServerTest > * EpochDrivenReplicationProtocolAcceptanceTest > * LeaderEpochIntegrationTest > Trogdor: [https://github.com/apache/kafka/pull/14771] > * AgentTest > Mirror: [https://github.com/apache/kafka/pull/14761] > * DedicatedMirrorIntegrationTest > * MirrorConnectorsIntegrationTest > * MirrorConnectorsWithCustomForwardingAdminIntegrationTest > Runtime: [https://github.com/apache/kafka/pull/14764] > * ConnectorTopicsIntegrationTest > * ExactlyOnceSourceIntegrationTest > * WorkerTest > * WorkerGroupMemberTest > Streams: [https://github.com/apache/kafka/pull/14769] (DONE) > * IQv2IntegrationTest > * MetricsReporterIntegrationTest > * NamedTopologyIntegrationTest > * PurgeRepartitionTopicIntegrationTest > These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]
gharris1727 merged PR #14769: URL: https://github.com/apache/kafka/pull/14769 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]
gharris1727 commented on PR #14769: URL: https://github.com/apache/kafka/pull/14769#issuecomment-1832605577 The streams test failures appear unrelated, as they are not subclasses of the changed tests, and all streams tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1409788411 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3697,16 +3698,60 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { -requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + +if (ClientMetricsReceiverPlugin.instance.isEmpty) { + info("Received get telemetry client request, no metrics receiver plugin configured or running with ZK") + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => +subscriptionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)) Review Comment: We can wait for @junrao to weigh in, but I think the handling of `APIVersions` should be as written in the KIP. The whole point is that a client can tell from the `APIVersionsResponse` that there is no need to get a telemetry subscription. For a cluster without client telemetry enabled, there are no additional RPCs when the client connects. With this PR, a 3.7 client will make extra RPCs even when client telemetry is not enabled. This should be avoided if at all possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409030872 ## streams/src/main/java/org/apache/kafka/streams/ClientInstanceIds.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * Encapsulates the {@code client instance id} used for metrics collection by + * producers, consumers, and the admin client used by Kafka Streams. + */ +public interface ClientInstanceIds { Review Comment: I updated the KIP and changed this from `class` to `interface`. ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,56 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. Review Comment: I updated the KIP and change this to `IllegalStateException` (it does not make sense to throw an sub-class of `InvalidStateStoreException` and other methods on `KafkaStreams` also use `IllegalStateException`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
kirktrue commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1409772670 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4387,7 +4398,52 @@ public FenceProducersResult fenceProducers(Collection transactionalIds, @Override public Uuid clientInstanceId(Duration timeout) { -throw new UnsupportedOperationException(); +if (timeout.isNegative()) { +throw new IllegalArgumentException("The timeout cannot be negative."); +} + +if (!clientTelemetryEnabled) { +throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`."); +} + +if (clientInstanceId != null) { +return clientInstanceId; +} + +final long now = time.milliseconds(); +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()), +new LeastLoadedNodeProvider()) { + +@Override +GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) { +return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse; +if (response.error() != Errors.NONE) { +future.completeExceptionally(response.error().exception()); +} else { +future.complete(response.data().clientInstanceId()); +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}, now); + +try { +clientInstanceId = future.get(); +} catch (Exception e) { +log.error("Error occurred while fetching client instance id", e); +throw new KafkaException("Error occurred while fetching client instance id", e); +} Review Comment: Why do we make an explicit RPC call for the admin client, but the other two clients use the `ClientTelemetryUtils. fetchClientInstanceId`? ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.Uuid; + +import java.time.Duration; +import java.util.Optional; + +public class ClientTelemetryUtils { Review Comment: I think moving it to `internals` makes sense. ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -361,6 +370,9 @@ private void cancelInFlightRequests(String nodeId, } } else if (request.header.apiKey() == ApiKeys.METADATA) { metadataUpdater.handleFailedRequest(now, Optional.empty()); +} else if ((request.header.apiKey() == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || +request.header.apiKey() == ApiKeys.PUSH_TELEMETRY) && telemetrySender != null) { +telemetrySender.handleFailedRequest(request.header.apiKey(), null); Review Comment: nit: maybe creating an `ApiKeys` from `request.header.apiKey()` would make this visually cleaner? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: add config for balance subtopology in rack aware task assignment [kafka]
mjsax merged PR #14711: URL: https://github.com/apache/kafka/pull/14711 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15948: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Bug) > Refactor AsyncKafkaConsumer shutdown > > > Key: KAFKA-15948 > URL: https://issues.apache.org/jira/browse/KAFKA-15948 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Major > > Upon closing we need a round trip from the network thread to the application > thread and then back to the network thread to complete the callback > invocation. Currently, we don't have any of that. I think we need to > refactor our closing mechanism. There are a few points to the refactor: > # The network thread should know if there's a custom user callback to > trigger or not. If there is, it should wait for the callback completion to > send a leave group. If not, it should proceed with the shutdown. > # The application thread sends a closing signal to the network thread and > continuously polls the background event handler until time runs out. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown
Philip Nee created KAFKA-15948: -- Summary: Refactor AsyncKafkaConsumer shutdown Key: KAFKA-15948 URL: https://issues.apache.org/jira/browse/KAFKA-15948 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Upon closing we need a round trip from the network thread to the application thread and then back to the network thread to complete the callback invocation. Currently, we don't have any of that. I think we need to refactor our closing mechanism. There are a few points to the refactor: # The network thread should know if there's a custom user callback to trigger or not. If there is, it should wait for the callback completion to send a leave group. If not, it should proceed with the shutdown. # The application thread sends a closing signal to the network thread and continuously polls the background event handler until time runs out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1409765577 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: This made sense to me before, but I'm trying to remember why. Is it possible that when we are building and sending the response we could receive the callback request and that means it will be handled on a different thread? Vs when acks=all, we free the thread after sending out the verification, so it can be available to receive the callback? ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { +val numRecords = 50 +val numProducersWithCompression = 5 +val numTransactions = 40 +val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + +for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") +} + +// KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: This made sense to me before, but I'm trying to remember why. Is it possible that when we are building and sending the response we could receive the callback request and that means it will be handled on a different thread? Vs when acks=all, we free the thread after sending out the verification, so it can be available to receive the callback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15947) Null pointer on LZ4 compression since Kafka 3.6
Ludo created KAFKA-15947: Summary: Null pointer on LZ4 compression since Kafka 3.6 Key: KAFKA-15947 URL: https://issues.apache.org/jira/browse/KAFKA-15947 Project: Kafka Issue Type: Bug Components: compression Affects Versions: 3.6.0 Reporter: Ludo I have a Kafka Stream application running well since month using client version {{3.5.1 }}with 3.5.1 (bitnami image: {{bitnami/3.5.1-debian-11-r44)}} using{{ compression.type: "lz4"}} I've recently updated a my kafka server to kafka 3.6 (bitnami image: {{{}bitnami/kafka:3.6.0-debian-11-r0){}}}. The startup is working well for days, and after some time, Kafka Stream crash and Kafka output a lot of NullPointerException on the console: {code:java} org.apache.kafka.common.KafkaException: java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:134) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) at kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:754) at kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:132) ... 25 more {code} At the same time the Kafka Stream raise this error: {code:java} org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic kestra_workertaskresult for task 3_6 due to:org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.Written offsets would not be recorded and no more records would be sent since this is a fatal error.at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:297)at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1505)at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:772)at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:757)at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:709)at
Re: [PR] KAFKA-15922: Add a MetadataVersion for JBOD [kafka]
rondagostino commented on code in PR #14860: URL: https://github.com/apache/kafka/pull/14860#discussion_r1409696104 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -59,10 +59,14 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString)) + val metadataVersion = getMetadataVersion(namespace, + Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString)) if (!metadataVersion.isKRaftSupported) { throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.") } + if (!metadataVersion.isProduction()) { +throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.") + } Review Comment: Do we need an override flag/option, do you think? ## server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java: ## @@ -386,4 +418,21 @@ public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) { public void testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion metadataVersion) { assertEquals((short) 1, metadataVersion.offsetCommitValueVersion(true)); } + +@Test +public void assertLatestProductionIsLessThanLatest() { +assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latest().ordinal(), +"Expected LATEST_PRODUCTION " + LATEST_PRODUCTION + +" to be less than the latest of " + MetadataVersion.latest()); +} Review Comment: Will this always be true? It seems that if we want to make the latest one production-ready then we will have to add a new MetadataVersion after it that isn't. Is this what we want? ## metadata/src/main/resources/common/metadata/PartitionRecord.json: ## @@ -42,13 +42,13 @@ "about": "The epoch of the partition leader." }, { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change anything in the partition." }, +{ "name": "Directories", "type": "[]uuid", "versions": "1+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1, + "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", "tag": 1, Review Comment: `s/nullableVersions": "1+",/nullableVersions": "2+",/` (unless this is done this way on purpose, in which case could you explain why to help me understand?) ## server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java: ## @@ -329,19 +336,44 @@ public void testIsDelegationTokenSupported(MetadataVersion metadataVersion) { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testIsElrSupported(MetadataVersion metadataVersion) { -assertEquals(metadataVersion.equals(IBP_3_7_IV1), -metadataVersion.isElrSupported()); -short expectPartitionRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; -assertEquals(expectPartitionRecordVersion, metadataVersion.partitionRecordVersion()); -short expectPartitionChangeRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; -assertEquals(expectPartitionChangeRecordVersion, metadataVersion.partitionChangeRecordVersion()); +assertEquals(metadataVersion.equals(IBP_3_7_IV3), metadataVersion.isElrSupported()); Review Comment: `s/metadataVersion.equals/metadataVersion.isAtLeast` We should probably add a method `testIsDirectoryAssignmentSupported(MetadataVersion metadataVersion)` ## metadata/src/main/resources/common/metadata/PartitionRecord.json: ## @@ -42,13 +42,13 @@ "about": "The epoch of the partition leader." }, { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change anything in the partition." }, +{ "name": "Directories", "type": "[]uuid", "versions": "1+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1, + "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", "tag": 1,
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on PR #14843: URL: https://github.com/apache/kafka/pull/14843#issuecomment-1832520113 Thanks @mjsax for reviewing, yes the code is more of wiring. @AndrewJSchofield @philipnee @kirktrue can you please help with you feedback for defining the class/method in KIP, and review will be helpful too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1409739909 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -160,6 +164,16 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, String metricGrpPrefix, Time time) { +this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty()); +} + +public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, +LogContext logContext, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1409739363 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @soarez : Thanks for the explanation. You are right that `KafkaEventQueue` does de-duping and only allows one outstanding `CommunicationEvent` in the queue. But it seems that duplicated `HeartbeatRequest`s could still be generated, which is causing the original transient failure. `CommunicationEvent` calls `sendBrokerHeartbeat` that calls the following. `_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler) ` The problem is that we have another queue in `NodeToControllerChannelManagerImpl` that doesn't do the de-duping. Once a `CommunicationEvent` is dequeued from `KafkaEventQueue`, a `HeartbeatRequest` will be queued in `NodeToControllerChannelManagerImpl`. At this point, another `CommunicationEvent` could be enqueued in `KafkaEventQueue`. When it's processed, another `HeartbeatRequest` will be queued in `NodeToControllerChannelManagerImpl`. This probably won't introduce long lasting duplicated `HeartbeatRequest` in practice since `CommunicationEvent` is typically queued in `KafkaEventQueue` for heartbeat interval. By that time, other pending `HeartbeatRequest`s will be processed and de-duped when enqueuing to `KafkaEventQueue`. But, maybe we could file a jira to track it. For the test, could we add a comment to explain why we need to wait for 10 `HeartbeatRequest`s? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of autoretry
[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15946: --- Labels: kip-848-client-support (was: ) > AsyncKafkaConsumer should retry commits on the application thread instead of > autoretry > -- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)