[jira] [Commented] (KAFKA-15610) Fix `CoreUtils.swallow()` test gaps
[ https://issues.apache.org/jira/browse/KAFKA-15610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1044#comment-1044 ] Ismael Juma commented on KAFKA-15610: - Yes. > Fix `CoreUtils.swallow()` test gaps > --- > > Key: KAFKA-15610 > URL: https://issues.apache.org/jira/browse/KAFKA-15610 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Priority: Minor > Labels: newbie > > For example, it should verify that the passed in `logging` is used in case of > an exception. We found that there is no test for this in > https://github.com/apache/kafka/pull/14529#discussion_r1355277747. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15610) Fix `CoreUtils.swallow()` test gaps
[ https://issues.apache.org/jira/browse/KAFKA-15610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1038#comment-1038 ] Atul Sharma commented on KAFKA-15610: - Hi [~ijuma], can i take this? > Fix `CoreUtils.swallow()` test gaps > --- > > Key: KAFKA-15610 > URL: https://issues.apache.org/jira/browse/KAFKA-15610 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Priority: Minor > Labels: newbie > > For example, it should verify that the passed in `logging` is used in case of > an exception. We found that there is no test for this in > https://github.com/apache/kafka/pull/14529#discussion_r1355277747. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15643) Improve unloading logging
David Jacot created KAFKA-15643: --- Summary: Improve unloading logging Key: KAFKA-15643 URL: https://issues.apache.org/jira/browse/KAFKA-15643 Project: Kafka Issue Type: Sub-task Reporter: David Jacot When a new leader is elected for a __consumer_offset partition, the followers are notified to unload the state. However, only the former leader is aware of it. The remaining follower prints out the following error: ERROR [GroupCoordinator id=1] Execution of UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime) The error is actually correct but we should improve the logging to not print anything when in the remaining follower case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1364913876 ## clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.collector; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.telemetry.emitter.Emitter; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link Emitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + * Review Comment: Should just be `` (similar below) ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; Review Comment: What just comes to my mind: the KIP should specify where new public interfaces are added. It's missing. Can we add it? ## clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.collector; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.telemetry.emitter.Emitter; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link Emitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + * + * + * In general, a {@code MetricsCollector} implementation is closely managed by another entity + * (that entity is colloquially referred to as the "telemetry reporter") that will be in + * charge of its lifecycle via the {@link #start()} and {@link #stop()} methods. The telemetry + * reporter should ensure that the {@link #start()} method is invoked once and only once + * before calls to {@link #collect(Emitter)} are made. Implementations of {@code MetricsCollector} + * should allow for the corner-case that {@link #stop()} is called before {@link #start()}, + * which might happen in the case of error on startup of the telemetry reporter. + * + * + * + * Regarding threading, the {@link #start()} and {@link #stop()} methods may be called from + * different threads and so proper
[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15629: Summary: proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery (was: proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and TimeStampedRangeQuery) > proposal to introduce IQv2 Query Types: TimestampedKeyQuery and > TimestampedRangeQuery > - > > Key: KAFKA-15629 > URL: https://issues.apache.org/jira/browse/KAFKA-15629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > > KIP-992: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery] > In the current IQv2 code, there are noticeable differences when interfacing > with plain-kv-store and ts-kv-store. Notably, the return type V acts as a > simple value for plain-kv-store but evolves into ValueAndTimestamp for > ts-kv-store, which presents type safety issues in the API. > Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring > compatibility concerns. > This brings us to the essence of our proposal: the introduction of distinct > query types. One that returns a plain value, another for values accompanied > by timestamps. > While querying a ts-kv-store for a plain value and then extracting it is > feasible, it doesn't make sense to query a plain-kv-store for a > ValueAndTimestamp. > Our vision is for plain-kv-store to always return V, while ts-kv-store should > return ValueAndTimestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and TimeStampedRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15629: Summary: proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and TimeStampedRangeQuery (was: proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery) > proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and > TimeStampedRangeQuery > - > > Key: KAFKA-15629 > URL: https://issues.apache.org/jira/browse/KAFKA-15629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > > KIP-992: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery] > In the current IQv2 code, there are noticeable differences when interfacing > with plain-kv-store and ts-kv-store. Notably, the return type V acts as a > simple value for plain-kv-store but evolves into ValueAndTimestamp for > ts-kv-store, which presents type safety issues in the API. > Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring > compatibility concerns. > This brings us to the essence of our proposal: the introduction of distinct > query types. One that returns a plain value, another for values accompanied > by timestamps. > While querying a ts-kv-store for a plain value and then extracting it is > feasible, it doesn't make sense to query a plain-kv-store for a > ValueAndTimestamp. > Our vision is for plain-kv-store to always return V, while ts-kv-store should > return ValueAndTimestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]
rreddy-22 opened a new pull request, #14582: URL: https://github.com/apache/kafka/pull/14582 Adding the use new coordinator flag to all the decorators in all the existing tests that use consumer groups. The new group coordinator cannot be used with zookeeper so this combination will not run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1364873194 ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AbstractRequest.Builder; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for sending client + * telemetry to the broker. + */ +@InterfaceStability.Evolving +public interface ClientTelemetrySender extends AutoCloseable { Review Comment: We can still add it to the KIP. (And must, as it's a public interface, right?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1364874333 ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryReceiver.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +@InterfaceStability.Evolving +public interface ClientTelemetryReceiver { Review Comment: If it's for broker only, it should no be added to `clients` module, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1364873194 ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AbstractRequest.Builder; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for sending client + * telemetry to the broker. + */ +@InterfaceStability.Evolving +public interface ClientTelemetrySender extends AutoCloseable { Review Comment: We can still add it to the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]
CalvinConfluent commented on PR #14465: URL: https://github.com/apache/kafka/pull/14465#issuecomment-1769903140 @junrao Thanks for the review, no relevant tests failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]
CalvinConfluent commented on code in PR #14465: URL: https://github.com/apache/kafka/pull/14465#discussion_r1364872040 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,6 +328,10 @@ public ControllerResult registerBroker( } int brokerId = request.brokerId(); BrokerRegistration existing = brokerRegistrations.get(brokerId); +if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { Review Comment: Sounds good. If the version is < 2, we can skip handling it because ELR can't be enabled. We can adjust it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15616) Define client telemetry states and their transitions
[ https://issues.apache.org/jira/browse/KAFKA-15616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15616: Fix Version/s: 3.7.0 > Define client telemetry states and their transitions > > > Key: KAFKA-15616 > URL: https://issues.apache.org/jira/browse/KAFKA-15616 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.7.0 > > > The client emitting metrics to broker needs to maintain states which > specifies what next action client should take i.e. request subscriptions, > push telemetry, etc. > > The changes should include comprehensive definition of all states a client > can move into and their transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax merged PR #14566: URL: https://github.com/apache/kafka/pull/14566 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax commented on PR #14566: URL: https://github.com/apache/kafka/pull/14566#issuecomment-1769879629 Again a lot of flaky test noise: ``` Build / JDK 11 and Scala 2.13 / testFollowerCompleteDelayedFetchesOnReplication(String).quorum=kraft – integration.kafka.server.FetchFromFollowerIntegrationTest 7s Build / JDK 11 and Scala 2.13 / testDeleteCmdNonEmptyGroup(String).quorum=kraft – kafka.admin.DeleteConsumerGroupsTest 10s Build / JDK 11 and Scala 2.13 / shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2] – org.apache.kafka.streams.integration.EosIntegrationTest 1m 14s Build / JDK 11 and Scala 2.13 / shouldUpgradeFromEosAlphaToEosV2[true] – org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest 4m 21s Build / JDK 11 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 38s Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 2m 15s Build / JDK 8 and Scala 2.12 / testRackAwareRangeAssignor(String).quorum=kraft – integration.kafka.server.FetchFromFollowerIntegrationTest 16s Build / JDK 8 and Scala 2.12 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 41s Build / JDK 8 and Scala 2.12 / shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – org.apache.kafka.streams.integration.EosIntegrationTest 1m 18s Build / JDK 8 and Scala 2.12 / [2] Type=Raft-Isolated, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest 1m 7s Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 46s Build / JDK 17 and Scala 2.13 / testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest 4s Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest 1m 48s Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 15s Build / JDK 17 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest 34s Build / JDK 17 and Scala 2.13 / shouldUpgradeFromEosAlphaToEosV2[true] – org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest 4m 39s Build / JDK 17 and Scala 2.13 / [2] Type=Raft-Isolated, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest 1m 8s Build / JDK 17 and Scala 2.13 / [5] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest 1m 7s Build / JDK 17 and Scala 2.13 / [2] Type=Raft-Isolated, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest 1m 8s Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax commented on code in PR #14566: URL: https://github.com/apache/kafka/pull/14566#discussion_r1364865721 ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop. + */ +public enum ClientTelemetryState { + +/** + * Client needs subscription from the broker. + */ +SUBSCRIPTION_NEEDED, + +/** + * Network I/O is in progress to retrieve subscription. + */ +SUBSCRIPTION_IN_PROGRESS, + +/** + * Awaiting telemetry interval for pushing metrics to broker. + */ +PUSH_NEEDED, Review Comment: I agree with Apoorv. It's just a question how you read the state, and `PUSH_NEEDED` is the permanent state in steady state, it does not mean "PUSH_NEEDED_NOW". Not sure if we could find a better name? It's an internal state machine so we can rename it at any point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1769876947 Hi @cadonna Thank you for spending time reviewing the PR again. I have answered your comments, and attached a followup ticket and a minor PR: https://github.com/apache/kafka/pull/14581 The PR is based on this PR so the commit history looks a bit messy; once we merge this PR, I will rebase again. Let me know if you have more questions around this PR, love to address them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Simplify UnsentRequest constructor [kafka]
philipnee opened a new pull request, #14581: URL: https://github.com/apache/kafka/pull/14581 Per: https://github.com/apache/kafka/pull/14532 The constructor of providing a BiConsumer seems confusing. Instead, here I provide a whenComplete() interface to chain the user provided callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15626: Replace verification guard object with an specific type [kafka]
ijuma commented on code in PR #14568: URL: https://github.com/apache/kafka/pull/14568#discussion_r1364857293 ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.atomic.AtomicLong; + +public class VerificationGuard { + +// The sentinel VerificationGuard will be used as a default when no verification guard is provided. +// It can not be used to verify a transaction is ongoing and its value is always 0. +public static final VerificationGuard SENTINEL = new VerificationGuard(0); +private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L); +private final long value; + +public VerificationGuard() { +value = INCREMENTING_ID.incrementAndGet(); +} + +private VerificationGuard(long value) { +this.value = value; +} + +@Override +public String toString() { +return "VerificationGuard: " + value; +} + +@Override +public boolean equals(Object obj) { +if ((null == obj) || (obj.getClass() != this.getClass())) +return false; +VerificationGuard guard = (VerificationGuard) obj; +return value == guard.value(); +} + +@Override +public int hashCode() { +return Long.hashCode(value); +} + +private long value() { +return value; +} + +public boolean verifiedBy(VerificationGuard verifyingGuard) { Review Comment: Nit: would `verify` be a better method name? It's in the active voice which is generally better for methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1004#comment-1004 ] Henry Cai commented on KAFKA-15620: --- Yes, non-blocking warning in the log. But it also prints out the stack trace in the log. It will give people uneasy feelings if they don't understand the root cause of the error. This warning/stack-trace will show up every time a broker restarts, and for every broker in the cluster. And this kind of warning messages will occur more and for more segments when the cluster runs longer and longer. > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, >
Re: [PR] KAFKA-15626: Replace verification guard object with an specific type [kafka]
ijuma commented on code in PR #14568: URL: https://github.com/apache/kafka/pull/14568#discussion_r1364857293 ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.atomic.AtomicLong; + +public class VerificationGuard { + +// The sentinel VerificationGuard will be used as a default when no verification guard is provided. +// It can not be used to verify a transaction is ongoing and its value is always 0. +public static final VerificationGuard SENTINEL = new VerificationGuard(0); +private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L); +private final long value; + +public VerificationGuard() { +value = INCREMENTING_ID.incrementAndGet(); +} + +private VerificationGuard(long value) { +this.value = value; +} + +@Override +public String toString() { +return "VerificationGuard: " + value; +} + +@Override +public boolean equals(Object obj) { +if ((null == obj) || (obj.getClass() != this.getClass())) +return false; +VerificationGuard guard = (VerificationGuard) obj; +return value == guard.value(); +} + +@Override +public int hashCode() { +return Long.hashCode(value); +} + +private long value() { +return value; +} + +public boolean verifiedBy(VerificationGuard verifyingGuard) { Review Comment: Nit: would `verify` be a better method name? It's the active voice which is generally better for methods. ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.atomic.AtomicLong; + +public class VerificationGuard { Review Comment: Nit: should this be final? ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.atomic.AtomicLong; + +public class VerificationGuard { + +// The sentinel VerificationGuard will be used as a default when no verification guard is provided. +// It can not be used to verify a transaction is ongoing and its value is always 0. +public static final VerificationGuard SENTINEL = new VerificationGuard(0); +private static final AtomicLong INCREMENTING_ID = new
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax commented on code in PR #14566: URL: https://github.com/apache/kafka/pull/14566#discussion_r1364857199 ## clients/src/main/java/org/apache/kafka/common/errors/IllegalClientTelemetryStateException.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +public class IllegalClientTelemetryStateException extends KafkaException { Review Comment: Thanks! (What Philip said: we could add a new exception, but if user are supposed to handle them, it must be part of the KIP -- we could of course still add it to the KIP) -- but avoiding it and using `IllegalStateException` seems better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax commented on code in PR #14566: URL: https://github.com/apache/kafka/pull/14566#discussion_r1364856191 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryState.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import org.apache.kafka.common.errors.IllegalClientTelemetryStateException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop. + */ +public enum ClientTelemetryState { + +/** + * Client needs subscription from the broker. + */ +SUBSCRIPTION_NEEDED, + +/** + * Network I/O is in progress to retrieve subscription. + */ +SUBSCRIPTION_IN_PROGRESS, + +/** + * Awaiting telemetry interval for pushing metrics to broker. + */ +PUSH_NEEDED, + +/** + * Network I/O in progress for pushing metrics payload. + */ +PUSH_IN_PROGRESS, + +/** + * Need to push the terminal metrics payload. + */ +TERMINATING_PUSH_NEEDED, + +/** + * Network I/O in progress for pushing terminal metrics payload. + */ +TERMINATING_PUSH_IN_PROGRESS, + +/** + * No more work should be performed, telemetry client terminated. + */ +TERMINATED; + +private final static Map> VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class); + +static { +/* + If clients needs a subscription, then issue telemetry API to fetch subscription from broker. + + However, it's still possible that client doesn't get very far before terminating. +*/ +VALID_NEXT_STATES.put( +SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, TERMINATED)); + +/* + If client is finished waiting for subscription, then client is ready to push the telemetry. + But, it's possible that no telemetry metrics are requested, hence client should go back to + subscription needed state i.e. requesting the next updated subscription. + + However, it's still possible that client doesn't get very far before terminating. +*/ +VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, +SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED)); Review Comment: Thanks for explaining. My thought was that we would still transit to `TERMINATING_PUSH_NEEDED`, but potentially "skip" this state, and do an immediate second transition to `TERMINATED` -- but if't easier to transit to `TERMINATED` directly, also fine with me. (Just want to understand what's going on.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1364855206 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -672,16 +673,84 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { +// Create a spy Cache Entry +val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + +val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + +val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) +cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + +assertCacheSize(1) + +var entry: RemoteIndexCache.Entry = null + +val latchForCacheRead = new CountDownLatch(1) +val latchForCacheRemove = new CountDownLatch(1) +val latchForTestWait = new CountDownLatch(1) + +var markForCleanupCallCount = 0 + +doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { +// Signal the CacheRead to unblock itself +latchForCacheRead.countDown() +// Wait for signal to start renaming the files +latchForCacheRemove.await() +// Calling the markForCleanup() actual method to start renaming the files +invocation.callRealMethod() Review Comment: @jeel2420 There are two times when markForCleanUp is called. 1. Throw `remove` function which we are calling in `removeCache Runnable`. 2. One at `invocation.callRealMethod() 708 line no` Even i tried running your test case locally it always assert with cacheSize 0 , as it is eventually getting deleted. IMO we should read and remove concurrently in the separate thread and validate the cacheSize based on the order of execution. We should not need to call explicitly for the scenario. @showuon WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1364855206 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -672,16 +673,84 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { +// Create a spy Cache Entry +val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + +val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + +val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) +cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + +assertCacheSize(1) + +var entry: RemoteIndexCache.Entry = null + +val latchForCacheRead = new CountDownLatch(1) +val latchForCacheRemove = new CountDownLatch(1) +val latchForTestWait = new CountDownLatch(1) + +var markForCleanupCallCount = 0 + +doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { +// Signal the CacheRead to unblock itself +latchForCacheRead.countDown() +// Wait for signal to start renaming the files +latchForCacheRemove.await() +// Calling the markForCleanup() actual method to start renaming the files +invocation.callRealMethod() Review Comment: @jeel2420 There are two times when markForCleanUp is called. 1. `remove` function which we are calling in `removeCache Runnable`. 2. One at `invocation.callRealMethod() 708 line no` Even i tried running your test case locally it always assert with cacheSize 0 , as it is eventually getting deleted. IMO we should read and remove concurrently in the separate thread and validate the cacheSize based on the order of execution. We should not need to call explicitly for the scenario. @showuon WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]
mjsax commented on code in PR #14566: URL: https://github.com/apache/kafka/pull/14566#discussion_r1364855062 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryState.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import org.apache.kafka.common.errors.IllegalClientTelemetryStateException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop. + */ +public enum ClientTelemetryState { Review Comment: Sounds good. Thx. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776999#comment-17776999 ] Kamal Chandraprakash commented on KAFKA-15620: -- [~h...@pinterest.com] What was the impact of this bug? Going by the code, it will log one warning statement and proceed further. > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > > You can see there are 2
[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776996#comment-17776996 ] Henry Cai commented on KAFKA-15620: --- Thanks. Will give it a try. > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > > You can see there are 2 leader epochs in this segment: > *segmentLeaderEpochs=\{5=6387830, 6=6721329}* > > 5. From the remote log retention code >
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364845881 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); Review Comment: Thanks `request.handler().completionTimeMs() instead of response.receivedTimeMs()` - This is addressed `UnsentRequest using request.handler() would not be possible` - Let me quickly follow up with a MINOR PR to address this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
showuon commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1364844466 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -672,16 +673,89 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { +// Create a spy Cache Entry +val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + +val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + +val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) +cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + +assertCacheSize(1) + +var entry: RemoteIndexCache.Entry = null + +val latchForCacheRead = new CountDownLatch(1) +val latchForCacheRemove = new CountDownLatch(1) +val latchForTestWait = new CountDownLatch(1) + +var markForCleanupCallCount = 0 + +doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { +// Signal the CacheRead to unblock itself +latchForCacheRead.countDown() +// Wait for signal to start renaming the files +latchForCacheRemove.await() +// Calling the markForCleanup() actual method to start renaming the files +invocation.callRealMethod() +// Signal TestWait to unblock itself so that test can be completed +latchForTestWait.countDown() + } +}).when(spyEntry).markForCleanup() + +val removeCache = (() => { + cache.remove(rlsMetadata.remoteLogSegmentId().id()) +}): Runnable + +val readCache = (() => { + // Wait for signal to start CacheRead + latchForCacheRead.await() + entry = cache.getIndexEntry(rlsMetadata) + // Signal the CacheRemove to start renaming the files + latchForCacheRemove.countDown() +}): Runnable + +val executor = Executors.newFixedThreadPool(2) +try { + executor.submit(removeCache: Runnable) + executor.submit(readCache: Runnable) Review Comment: @jeel2420 , sorry, I had another look and found we should also verify these 2 threads has no exception thrown. In the issue description, without this fix, there will be IOException thrown. So, we should verify there's no exception using the returned `future` from `executor.submit`. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15613: Client API definition and configurations (KIP-714) [kafka]
mjsax merged PR #14560: URL: https://github.com/apache/kafka/pull/14560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364842391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: Not entirely sure about the behavior of the OffsetRequestManager - I'll need to get back to this. Added a ticket there: KAFKA-15642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15642) Ensure offset fetcher behaves correctly when the request is Timeout
Philip Nee created KAFKA-15642: -- Summary: Ensure offset fetcher behaves correctly when the request is Timeout Key: KAFKA-15642 URL: https://issues.apache.org/jira/browse/KAFKA-15642 Project: Kafka Issue Type: Bug Reporter: Philip Nee Assignee: Philip Nee I need to test the behavior of OffsetFetcher when the request is timeout - It seems like we should continue to retry until the top level timer times out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15613: Client API definition and configurations (KIP-714) [kafka]
mjsax commented on PR #14560: URL: https://github.com/apache/kafka/pull/14560#issuecomment-1769844494 The Jenkins UI is a mess -- you can click on "Tests" in the top menu bar to see failing tests. ``` New failing - 16 Build / JDK 8 and Scala 2.12 / testTimeouts() – org.apache.kafka.controller.QuorumControllerTest <1s Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 21 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – org.apache.kafka.common.network.SslTransportLayerTest 19s Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 14s Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 11 and Scala 2.13 / testSingleIP() – org.apache.kafka.clients.ClusterConnectionStatesTest <1s Build / JDK 11 and Scala 2.13 / testSingleIP() – org.apache.kafka.clients.ClusterConnectionStatesTest <1s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 3m 28s Build / JDK 11 and Scala 2.13 / testRestartReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 1m 4s Build / JDK 11 and Scala 2.13 / testNoConsumeWithoutDescribeAclViaAssign(String).quorum=kraft – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest 13s Build / JDK 11 and Scala 2.13 / [1] error=UNKNOWN_SERVER_ERROR – kafka.coordinator.transaction.ProducerIdManagerTest 15s Build / JDK 11 and Scala 2.13 / executeTieredStorageTest(String).quorum=zk – org.apache.kafka.tiered.storage.integration.EnableRemoteLogOnTopicTest 39s Build / JDK 11 and Scala 2.13 / shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest 1m 11s Build / JDK 11 and Scala 2.13 / shouldInvokeUserDefinedGlobalStateRestoreListener() – org.apache.kafka.streams.integration.RestoreIntegrationTest 1m 17s Build / JDK 11 and Scala 2.13 / shouldInvokeUserDefinedGlobalStateRestoreListener() – org.apache.kafka.streams.integration.RestoreIntegrationTest 1m 12s Build / JDK 11 and Scala 2.13 / shouldHonorEOSWhenUsingCachingAndStandbyReplicas – org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest 2m 24s Existing failures - 3 Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 37s Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 40s Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest ``` But this is all just flaky tests (your PR should be be even able to break anything). Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]
mjsax commented on PR #14554: URL: https://github.com/apache/kafka/pull/14554#issuecomment-1769842128 Triggered a Jenkins re-run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]
mjsax commented on code in PR #14554: URL: https://github.com/apache/kafka/pull/14554#discussion_r1364838008 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6222,4 +6222,62 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { +val data = new GetTelemetrySubscriptionsRequestData() +data.setClientInstanceId(Uuid.randomUuid()) + +val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: If this case cannot happen in practice, what do we actually test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]
hanyuzheng7 commented on code in PR #14477: URL: https://github.com/apache/kafka/pull/14477#discussion_r1364836021 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1586,12 +1593,15 @@ public void shouldHandleKeyQuery( public void shouldHandleRangeQuery( final Optional lower, final Optional upper, +final boolean isKeyAscending, final Function valueExtactor, -final Set expectedValue) { +final List expectedValue) { Review Comment: It seems that exiting code using expectedValue and actualValue for other methods, I will update them all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate StateDirectory mock in TaskManagerTest to Mockito [kafka]
github-actions[bot] commented on PR #13897: URL: https://github.com/apache/kafka/pull/13897#issuecomment-1769836677 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]
mjsax commented on code in PR #14554: URL: https://github.com/apache/kafka/pull/14554#discussion_r1364836200 ## clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class GetTelemetrySubscriptionsRequest extends AbstractRequest { + +public static class Builder extends AbstractRequest.Builder { + +private final GetTelemetrySubscriptionsRequestData data; + +public Builder(GetTelemetrySubscriptionsRequestData data) { +this(data, false); +} + +public Builder(GetTelemetrySubscriptionsRequestData data, boolean enableUnstableLastVersion) { Review Comment: So this would be changes before 3.7 release to mark as stable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]
hanyuzheng7 commented on code in PR #14477: URL: https://github.com/apache/kafka/pull/14477#discussion_r1364836021 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1586,12 +1593,15 @@ public void shouldHandleKeyQuery( public void shouldHandleRangeQuery( final Optional lower, final Optional upper, +final boolean isKeyAscending, final Function valueExtactor, -final Set expectedValue) { +final List expectedValue) { Review Comment: It seems that exiting code using expectedValue and actualValue for other methods, I will update them all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kip951 poc [kafka]
github-actions[bot] commented on PR #14063: URL: https://github.com/apache/kafka/pull/14063#issuecomment-1769836600 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15620. --- Fix Version/s: 3.7.0 Resolution: Duplicate > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > > You can see there are 2 leader epochs in this segment: > *segmentLeaderEpochs=\{5=6387830, 6=6721329}* > > 5. From the remote log retention code >
[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776989#comment-17776989 ] Luke Chen commented on KAFKA-15620: --- [~ckamal], thanks for the info. [~h...@pinterest.com], I've backported into 3.6 branch. It'll be included in v3.6.1 release. Thanks. > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.6.1 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > > You can see there are 2 leader epochs in this
[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-15479: -- Fix Version/s: 3.6.1 > Remote log segments should be considered once for retention breach > -- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > When a remote log segment contains multiple epoch, then it gets considered > for multiple times during breach by retention size/time/start-offset. This > will affect the deletion by remote log retention size as it deletes the > number of segments lesser than expected. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364833452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: CoordinatorRequestManager doesn't have it - so I added the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]
hanyuzheng7 commented on code in PR #14477: URL: https://github.com/apache/kafka/pull/14477#discussion_r1364832374 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1586,12 +1593,15 @@ public void shouldHandleKeyQuery( public void shouldHandleRangeQuery( final Optional lower, final Optional upper, +final boolean isKeyAscending, final Function valueExtactor, -final Set expectedValue) { +final List expectedValue) { Review Comment: ok, I will update it. ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1604,9 +1614,10 @@ public void shouldHandleRangeQuery( if (result.getGlobalResult() != null) { fail("global tables aren't implemented"); } else { -final Set actualValue = new HashSet<>(); +final List actualValue = new ArrayList<>(); Review Comment: ok, I will update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13973: Fix Version/s: 3.6.1 > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Assignee: Nicholas Telford >Priority: Minor > Fix For: 3.7.0, 3.6.1 > > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); > noTimestampsIter.seekToFirst(); > if (noTimestampsIter.isValid()) { > log.info("Opening store {} in upgrade mode", name); > dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, > withTimestampColumnFamily); > } else { > log.info("Opening store {} in regular mode", name); > dbAccessor = new > SingleColumnFamilyAccessor(withTimestampColumnFamily); > noTimestampColumnFamily.close(); > db.dropColumnFamily(noTimestampColumnFamily); // try fix it > } > noTimestampsIter.close(); > }{code} > > > > {{But it's seems that you can't drop the default column family in RocksDb > (see screenshot).}} > {{*So how can we have the real block-cache-capacity metrics value in Kafka > Streams monitoring ?* }} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13973: Fix inflated block cache metrics [kafka]
mjsax commented on PR #14317: URL: https://github.com/apache/kafka/pull/14317#issuecomment-1769829743 Cherry-picked this to `3.6` branch. Also tries to cherry-pick to `3.5` but the newly added test fails on 3.5 -- not sure why yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]
hanyuzheng7 commented on PR #14477: URL: https://github.com/apache/kafka/pull/14477#issuecomment-1769829820 > LGTM. Two nits to improve variable names. > > As discussed in person, we also should update JavaDocs for `RangeQuery`, and `ReadOnlyKeyValueStore` to make it more explicit what ordering guarantees are provided. > > Furthermore, we also need to update `docs/streams/...` > > We can either add all this to this PR, or you do follow up PR. Just let me know. I believe it would be best to open a new PR dedicated solely to updating the Javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364830057 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: Per previous comment: NetworkTimeout isn't handled correctly in the commitRequestManager, therefore I want to address this in that PR. I will check other request manager to ensure this is tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364829502 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); return new NetworkClientDelegate.UnsentRequest( -builder, -coordinatorRequestManager.coordinator(), -(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); +builder, +coordinatorRequestManager.coordinator(), +(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); Review Comment: In fact, this is a bug and I'm planning to address this in [KAFKA-15562](https://issues.apache.org/jira/browse/KAFKA-15562). Can we address the gap all in a follow up PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types
[ https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15562: --- Description: Both fetchOffsetRequest and commitOffsetRequest handlers don't have sufficient logic to handle timeout exception. CommitOffsetRequest handler also doesn't handle various of server error such as coordinator not found. We need to handle: If Exception is non null: - handle RetriableError that respects requestTimeoutMs - handle NonRetriableError If the response contains error, ensure to: - mark coordinator unknown if needed - retry if needed - fail the request > Ensure fetch offset and commit offset handler handles both timeout and > various error types > -- > > Key: KAFKA-15562 > URL: https://issues.apache.org/jira/browse/KAFKA-15562 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview > > Both fetchOffsetRequest and commitOffsetRequest handlers don't have > sufficient logic to handle timeout exception. > > CommitOffsetRequest handler also doesn't handle various of server error such > as coordinator not found. We need to handle: > If Exception is non null: > - handle RetriableError that respects requestTimeoutMs > - handle NonRetriableError > > If the response contains error, ensure to: > - mark coordinator unknown if needed > - retry if needed > - fail the request -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types
[ https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15562: --- Summary: Ensure fetch offset and commit offset handler handles both timeout and various error types (was: Ensure CommitRequestManager handles offset commit error handling) > Ensure fetch offset and commit offset handler handles both timeout and > various error types > -- > > Key: KAFKA-15562 > URL: https://issues.apache.org/jira/browse/KAFKA-15562 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview > > Review the code in Consumercoordinator#OffsetCommitResponseHandler > Implement the error handling and add tests for these errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types
[ https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15562: --- Description: (was: Review the code in Consumercoordinator#OffsetCommitResponseHandler Implement the error handling and add tests for these errors.) > Ensure fetch offset and commit offset handler handles both timeout and > various error types > -- > > Key: KAFKA-15562 > URL: https://issues.apache.org/jira/browse/KAFKA-15562 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364820542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -256,27 +256,39 @@ public String toString() { public static class FutureCompletionHandler implements RequestCompletionHandler { +private long responseCompletionTimeMs; private final CompletableFuture future; FutureCompletionHandler() { -this.future = new CompletableFuture<>(); +future = new CompletableFuture<>(); } -public void onFailure(final RuntimeException e) { -future.completeExceptionally(e); +public void onFailure(final long currentTimeMs, final RuntimeException e) { Review Comment: You are right, I should add one there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Worker sink task threaded test mockito [kafka]
hgeraldino closed pull request #14580: Worker sink task threaded test mockito URL: https://github.com/apache/kafka/pull/14580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Worker sink task threaded test mockito [kafka]
hgeraldino opened a new pull request, #14580: URL: https://github.com/apache/kafka/pull/14580 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] wip: investigate Flaky tests [kafka]
dengziming closed pull request #14512: wip: investigate Flaky tests URL: https://github.com/apache/kafka/pull/14512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]
hudeqi commented on PR #14567: URL: https://github.com/apache/kafka/pull/14567#issuecomment-1769788407 LGTM, thanks for this PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]
showuon commented on PR #14573: URL: https://github.com/apache/kafka/pull/14573#issuecomment-1769750941 @dengziming , do you want to have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15633: Fix overwrite of meta.properties at startup to handle JBOD. [WIP] [kafka]
pprovenzano commented on PR #14578: URL: https://github.com/apache/kafka/pull/14578#issuecomment-1769746613 Adding of the directory.id in any form will prevent the cluster from rolling back to a previous ZK version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15479) Remote log segments should be considered once for retention breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776960#comment-17776960 ] Henry Cai commented on KAFKA-15479: --- This is also related to https://issues.apache.org/jira/browse/KAFKA-15620, can the fix be back ported to Kafka 3.6 branch? > Remote log segments should be considered once for retention breach > -- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.7.0 > > > When a remote log segment contains multiple epoch, then it gets considered > for multiple times during breach by retention size/time/start-offset. This > will affect the deletion by remote log retention size as it deletes the > number of segments lesser than expected. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364758468 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, // i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); } {code} Here is the code from {{FetchCollector.handleInitializeErrors}}: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); . . . if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } . . . } {code} The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because of the following code? {code:java} if (position == null || fetchOffset != position.offset) { {code} It's a bit weird that the above check is only done for the {{OFFSET_OUT_OF_RANGE}} error, instead of any error. was: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Issue Type: Bug (was: Improvement) > Investigate CompletedFetch handleInitializeErrors for accuracy > -- > > Key: KAFKA-15641 > URL: https://issues.apache.org/jira/browse/KAFKA-15641 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named > testFetchedRecordsAfterSeek, which [upon closer > inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] > may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. > Here is the test code: > {code:java} > @Test > public void testFetchedRecordsAfterSeek() { > buildFetcher(OffsetResetStrategy.NONE, > new ByteArrayDeserializer(), > new ByteArrayDeserializer(), > 2, > IsolationLevel.READ_UNCOMMITTED); > assignFromUser(singleton(tp0)); > // Step 1: seek to offset 0 of our partition. > subscriptions.seek(tp0, 0); > // Step 2: issue a mock broker request to fetch data from the current > offset in our local state, > // i.e. offset 0. > assertTrue(sendFetches() > 0); > // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. > client.prepareResponse(fullFetchResponse(tidp0, records, > Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); > // Step 4: process the network I/O to receive the response from the > broker with the OFFSET_OUT_OF_RANGE > // that was injected. Note, however, that we haven't "collected" the > fetch data included in the response. > networkClientDelegate.poll(time.timer(0)); > // Step 5: validate that the partition is not marked as needing its > offset reset. The response validation > // logic is performed during the fetch collection, which doesn't happen > until assertEmptyFetch below. > assertFalse(subscriptions.isOffsetResetNeeded(tp0)); > // Step 6: update the partition's position in our local state to offset > 2. We still haven't collected the > // fetch, so we haven't performed any validation of the fetch response. > subscriptions.seek(tp0, 2); > // Step 7: perform the fetch collection. As part of that process, error > handling is performed. Since > // we intentionally injected an error above, this error will be checked > and handled in the > // FetchCollector.handleInitializeErrors method. When handling > OFFSET_OUT_OF_RANGE, handleInitializeErrors > // will notice that the original requested offset (0) is different from > the state of our current offset (2). > assertEmptyFetch("Should not return records or advance position after > seeking to end of topic partition"); > } > {code} > Here is the code from {{FetchCollector.handleInitializeErrors}}: > {code:java} > private void handleInitializeErrors(final CompletedFetch completedFetch, > final Errors error) { > final TopicPartition tp = completedFetch.partition; > final long fetchOffset = completedFetch.nextFetchOffset(); > . . . > if (error == Errors.OFFSET_OUT_OF_RANGE) { > Optional clearedReplicaId = > subscriptions.clearPreferredReadReplica(tp); > if (!clearedReplicaId.isPresent()) { > // If there's no preferred replica to clear, we're fetching from > the leader so handle > // this error normally > SubscriptionState.FetchPosition position = > subscriptions.position(tp); > if (position == null || fetchOffset != position.offset) { > log.debug("Discarding stale fetch response for partition {} > since the fetched offset {} " + > "does not match the current offset {}", tp, > fetchOffset, position); > } else { > String errorMessage = "Fetch position " + position + " is out > of range for partition " + tp; > if (subscriptions.hasDefaultOffsetResetPolicy()) { > log.info("{}, resetting offset", errorMessage); > subscriptions.requestOffsetReset(tp); > } else { > log.info("{}, raising error to the application since no > reset policy is configured", > errorMessage); > throw new OffsetOutOfRangeException(errorMessage, > Collections.singletonMap(tp, position.offset)); > } > } > } else { > log.debug("Unset the preferred read replica {} for partition {} > since we got {} when fetching {}", > clearedReplicaId.get(), tp, error,
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, // i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until we call assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); } {code} Here is the code from {{FetchCollector.handleInitializeErrors}}: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); . . . if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } . . . } {code} The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because of the following code? {code:java} if (position == null || fetchOffset != position.offset) { {code} It's a bit weird that the above check is only done for the {{OFFSET_OUT_OF_RANGE}} error, instead of any error. was: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until we call assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); } {code} Here is the code from {{FetchCollector.handleInitializeErrors}}: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); . . . if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } . . . } {code} The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because of the following code? {code:java} if (position == null || fetchOffset != position.offset) { {code} It's a bit weird that the above check is only done for the {{OFFSET_OUT_OF_RANGE}} error, instead of any error. was: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until we call assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); } {code} Here is the code from {{FetchCollector.handleInitializeErrors}}: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); . . . if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } . . . } {code} The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because of the following code? {code:java} if (position == null || fetchOffset != position.offset) { {code} It's a bit weird that the above check is only done for the {{OFFSET_OUT_OF_RANGE}} error, instead of any error. was: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until we call assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); } {code} Here is the code from FetchCollector.handleInitializeErrors: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { . . . } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { . . . } else if (error == Errors.UNKNOWN_TOPIC_ID) { . . . } else if (error == Errors.INCONSISTENT_TOPIC_ID) { . . . } else if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { . . . } else if (error ==
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Description: The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named testFetchedRecordsAfterSeek, which [upon closer inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] may reveal some incorrect logic in {{{}FetchCollector.handleInitializeErrors{}}}. Here is the test code: {code:java} @Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until we call assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); }{code} Here is the code from FetchCollector.handleInitializeErrors: {code:java} private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { . . . } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { . . . } else if (error == Errors.UNKNOWN_TOPIC_ID) { . . . } else if (error == Errors.INCONSISTENT_TOPIC_ID) { . . . } else if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { . . . } else if (error ==
[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15338: Affects Version/s: (was: 3.4.0) (was: 3.3.1) (was: 3.3.2) (was: 3.5.0) (was: 3.4.1) (was: 3.5.1) > The metric group documentation for metrics added in KAFKA-13945 is incorrect > > > Key: KAFKA-15338 > URL: https://issues.apache.org/jira/browse/KAFKA-15338 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Neil Buesing >Assignee: Atul Sharma >Priority: Trivial > Labels: beginner, newbie > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > ops.html (docs/streams/ops.html) incorrectly states that the metrics type is > "stream-processor-node-metrics", but in looking at the metrics and inspecting > the code in TopicMetrics, these metrics have a type of "stream-topic-metrics". > 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", > "records-consumed-total", and "records-produced-total". > Looks like the type was changed from the KIP, and the documentation still > reflects the KIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
Kirk True created KAFKA-15641: - Summary: Investigate CompletedFetch handleInitializeErrors for accuracy Key: KAFKA-15641 URL: https://issues.apache.org/jira/browse/KAFKA-15641 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Kirk True Assignee: Kirk True The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there are three issues... First: {quote}{color:#172b4d}This is an existing issue. But the way we handle paused partitions in {{collectFetch}} seems problematic. The application thread first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief window where the paused partition is not included in either {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread kicks in in that window, it could have fetched another chunk for that partition and added the response back to FetchBuffer. This would violate the assumption there is no more than one pending {{CompletedFetch}} per partition in FetchBuffer and could cause records returned not in offset order or duplicates to be returned.{color} {quote} {color:#172b4d}Second:{color} {quote}{color:#172b4d}The second existing issue is on the {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is that after all records are drained from {{{}nextInLineFetch{}}}. We only call {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, the background thread can't fetch the next chunk. So, it seems that we will just be stuck here.{color} {quote} {color:#172b4d}Third:{color} {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch?{color} {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15338: Fix Version/s: 3.3.3 3.4.2 3.5.2 > The metric group documentation for metrics added in KAFKA-13945 is incorrect > > > Key: KAFKA-15338 > URL: https://issues.apache.org/jira/browse/KAFKA-15338 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1 >Reporter: Neil Buesing >Assignee: Atul Sharma >Priority: Trivial > Labels: beginner, newbie > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > ops.html (docs/streams/ops.html) incorrectly states that the metrics type is > "stream-processor-node-metrics", but in looking at the metrics and inspecting > the code in TopicMetrics, these metrics have a type of "stream-topic-metrics". > 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", > "records-consumed-total", and "records-produced-total". > Looks like the type was changed from the KIP, and the documentation still > reflects the KIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15378: fix streams upgrade system test [kafka]
mjsax commented on PR #14539: URL: https://github.com/apache/kafka/pull/14539#issuecomment-1769719774 The following system tests failed: - test_upgrade_to_cooperative_rebalance - 0.10.1.1 - 0.10.2.2 - 1.0.2 - 1.1.1 - 2.0.1 - 2.3.1 - test_app_upgrade - 2.6.3 / full - 2.7.2 / full - 3.3.2 / full - test_rolling_upgrade_with_2_bounces - 0.10.0.1 - 0.10.1.1 - 0.10.2.2 - 0.11.0.3 - 1.0.2 - 2.6.3 - 2.7.2 - 3.3.2 - test_broker_type_bounce - "broker_type": "controller", "failure_mode": "hard_shutdown", "metadata_quorum": "ZK", - "broker_type": "leader", "failure_mode": "hard_shutdown", "metadata_quorum": "ISOLATED_KRAFT", - "broker_type": "leader", "failure_mode": "hard_shutdown", "metadata_quorum": "ZK", - test_many_brokers_bounce - failure_mode": "clean_shutdown", "metadata_quorum": "ISOLATED_KRAFT", - "failure_mode": "clean_shutdown", "metadata_quorum": "ZK", - test_compatible_brokers_eos_alpha_enabled - 2.6.3 - 2.7.2 - 3.3.2 - test_compatible_brokers_eos_disabled - 2.6.3 - 2.7.2 - 3.3.2 - test_compatible_brokers_eos_v2_enabled - 2.6.3 - 2.7.2 - 3.3.2 Overall, we are not in good shape :( Triggered a re-run to see what is noise: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5896/ But I actually believe, we might want to merge this PR as-is to unblock Mickeal's PR, and tackle each of these test one-by-one as follow up work? Thoughts? @mimaison @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add KIP-941 to Kafka Streams upgrade docs [kafka]
mjsax commented on PR #14577: URL: https://github.com/apache/kafka/pull/14577#issuecomment-1769695164 Merged to `trunk` and cherry-picked to `3.6` branch. Also ported to `kafka-site`: https://github.com/apache/kafka-site/pull/563 (merged right away). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add KIP-941 to Kafka Streams upgrade docs [kafka]
mjsax merged PR #14577: URL: https://github.com/apache/kafka/pull/14577 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15629: Description: KIP-992: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery] In the current IQv2 code, there are noticeable differences when interfacing with plain-kv-store and ts-kv-store. Notably, the return type V acts as a simple value for plain-kv-store but evolves into ValueAndTimestamp for ts-kv-store, which presents type safety issues in the API. Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring compatibility concerns. This brings us to the essence of our proposal: the introduction of distinct query types. One that returns a plain value, another for values accompanied by timestamps. While querying a ts-kv-store for a plain value and then extracting it is feasible, it doesn't make sense to query a plain-kv-store for a ValueAndTimestamp. Our vision is for plain-kv-store to always return V, while ts-kv-store should return ValueAndTimestamp. was: In the current IQv2 code, there are noticeable differences when interfacing with plain-kv-store and ts-kv-store. Notably, the return type V acts as a simple value for plain-kv-store but evolves into ValueAndTimestamp for ts-kv-store, which presents type safety issues in the API. Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring compatibility concerns. This brings us to the essence of our proposal: the introduction of distinct query types. One that returns a plain value, another for values accompanied by timestamps. While querying a ts-kv-store for a plain value and then extracting it is feasible, it doesn't make sense to query a plain-kv-store for a ValueAndTimestamp. Our vision is for plain-kv-store to always return V, while ts-kv-store should return ValueAndTimestamp. > proposal to introduce IQv2 Query Types: TimeStampKeyQuery and > TimeStampRangeQuery > - > > Key: KAFKA-15629 > URL: https://issues.apache.org/jira/browse/KAFKA-15629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > > KIP-992: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery] > In the current IQv2 code, there are noticeable differences when interfacing > with plain-kv-store and ts-kv-store. Notably, the return type V acts as a > simple value for plain-kv-store but evolves into ValueAndTimestamp for > ts-kv-store, which presents type safety issues in the API. > Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring > compatibility concerns. > This brings us to the essence of our proposal: the introduction of distinct > query types. One that returns a plain value, another for values accompanied > by timestamps. > While querying a ts-kv-store for a plain value and then extracting it is > feasible, it doesn't make sense to query a plain-kv-store for a > ValueAndTimestamp. > Our vision is for plain-kv-store to always return V, while ts-kv-store should > return ValueAndTimestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15629: Labels: kip (was: ) > proposal to introduce IQv2 Query Types: TimeStampKeyQuery and > TimeStampRangeQuery > - > > Key: KAFKA-15629 > URL: https://issues.apache.org/jira/browse/KAFKA-15629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > > In the current IQv2 code, there are noticeable differences when interfacing > with plain-kv-store and ts-kv-store. Notably, the return type V acts as a > simple value for plain-kv-store but evolves into ValueAndTimestamp for > ts-kv-store, which presents type safety issues in the API. > Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring > compatibility concerns. > This brings us to the essence of our proposal: the introduction of distinct > query types. One that returns a plain value, another for values accompanied > by timestamps. > While querying a ts-kv-store for a plain value and then extracting it is > feasible, it doesn't make sense to query a plain-kv-store for a > ValueAndTimestamp. > Our vision is for plain-kv-store to always return V, while ts-kv-store should > return ValueAndTimestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364715414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -162,7 +165,9 @@ private Fetch fetchRecords(final CompletedFetch nextInLineFetch) { throw new IllegalStateException("Missing position for fetchable partition " + tp); if (nextInLineFetch.nextFetchOffset() == position.offset) { -List> partRecords = nextInLineFetch.fetchRecords(fetchConfig, fetchConfig.maxPollRecords); +List> partRecords = nextInLineFetch.fetchRecords(fetchConfig, Review Comment: I've added this to KAFKA-15640 for a dedicate effort to clean up the modifications to this data to make them atomic operations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15640) Refactor CompletedFetch initialization
[ https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15640: -- Description: The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there are three issues... First: {quote}{color:#172b4d}This is an existing issue. But the way we handle paused partitions in {{collectFetch}} seems problematic. The application thread first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief window where the paused partition is not included in either {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread kicks in in that window, it could have fetched another chunk for that partition and added the response back to FetchBuffer. This would violate the assumption there is no more than one pending {{CompletedFetch}} per partition in FetchBuffer and could cause records returned not in offset order or duplicates to be returned.{color} {quote} {color:#172b4d}Second:{color} {quote}{color:#172b4d}The second existing issue is on the {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is that after all records are drained from {{{}nextInLineFetch{}}}. We only call {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, the background thread can't fetch the next chunk. So, it seems that we will just be stuck here.{color} {quote} {color:#172b4d}Third:{color} {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch?{color} {quote} was: The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments: {quote}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch? {quote} > Refactor CompletedFetch initialization > -- > > Key: KAFKA-15640 > URL: https://issues.apache.org/jira/browse/KAFKA-15640 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > > The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and > {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments > [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and > [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], > there are three issues... > > First: > {quote}{color:#172b4d}This is an existing issue. But the way we handle paused > partitions in {{collectFetch}} seems problematic. The application thread > first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls > {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief > window where the paused partition is not included in either > {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread > kicks in in that window, it could have fetched another chunk for that > partition and added the response back to FetchBuffer. This would violate the > assumption there is no more than one pending {{CompletedFetch}} per partition > in FetchBuffer and could cause records returned not in offset order or > duplicates to be returned.{color} > {quote} > > {color:#172b4d}Second:{color} > {quote}{color:#172b4d}The second existing issue is on the > {{fetchBuffer.setNextInLineFetch}}
[jira] [Updated] (KAFKA-15640) Refactor CompletedFetch initialization
[ https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15640: -- Priority: Major (was: Minor) > Refactor CompletedFetch initialization > -- > > Key: KAFKA-15640 > URL: https://issues.apache.org/jira/browse/KAFKA-15640 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and > {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments > [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and > [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], > there are three issues... > > First: > {quote}{color:#172b4d}This is an existing issue. But the way we handle paused > partitions in {{collectFetch}} seems problematic. The application thread > first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls > {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief > window where the paused partition is not included in either > {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread > kicks in in that window, it could have fetched another chunk for that > partition and added the response back to FetchBuffer. This would violate the > assumption there is no more than one pending {{CompletedFetch}} per partition > in FetchBuffer and could cause records returned not in offset order or > duplicates to be returned.{color} > {quote} > > {color:#172b4d}Second:{color} > {quote}{color:#172b4d}The second existing issue is on the > {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is > that after all records are drained from {{{}nextInLineFetch{}}}. We only call > {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, > until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, > the background thread can't fetch the next chunk. So, it seems that we will > just be stuck here.{color} > {quote} > > {color:#172b4d}Third:{color} > {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and > {{fetchBuffer.poll}} are separate operations and we expect the caller to call > them in the right order to avoid a partition missing in FetchBuffer in the > transition phase. It still leaves us with the situation that a partition > could be in both completedFetches and nextInLineFetch at a particular time. > It's not a problem for now, but it may be in the future. Could we make them > an atomic operation? If not, could we add a comment to document the correct > usage of the api and the impact on partition being duplicated in > completedFetches and nextInLineFetch?{color} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15640) Refactor CompletedFetch initialization
Kirk True created KAFKA-15640: - Summary: Refactor CompletedFetch initialization Key: KAFKA-15640 URL: https://issues.apache.org/jira/browse/KAFKA-15640 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Kirk True Assignee: Kirk True The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments: {quote}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch? {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Don't log missing partitions at ERROR level [kafka]
splett2 opened a new pull request, #14579: URL: https://github.com/apache/kafka/pull/14579 ### What We are logging missing partitions as errors in the `state.change.logger` which may be a bit alarming. There are valid cases where the partition may not exist, eg: partition reassignment. ### Testing Verified that ReassignPartitionsIntegrationTest logs partition reassignments at error without the patch and doesn't with the patch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364712500 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -35,19 +41,24 @@ * * * - * Note: this class is not thread-safe and is intended to only be used from a single thread. + * Note: this class is thread-safe with the intention that {@link CompletedFetch the data} will be Review Comment: I've filed KAFKA-15640 to look into this in more detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364707771 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364706025 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364704339 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchEvent; +import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ConsumerNetworkThreadTest { + +private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; +private Time time; +private ConsumerMetadata metadata; +private NetworkClientDelegate networkClient; +private BlockingQueue applicationEventsQueue; +private ApplicationEventProcessor applicationEventProcessor; +private OffsetsRequestManager offsetsRequestManager; +private CommitRequestManager commitManager; +private ConsumerNetworkThread consumerNetworkThread; +private MockClient client; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); +time = testBuilder.time; +metadata = testBuilder.metadata; +networkClient = testBuilder.networkClientDelegate; +client = testBuilder.client; +applicationEventsQueue = testBuilder.applicationEventQueue; +applicationEventProcessor = testBuilder.applicationEventProcessor; +commitManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); +offsetsRequestManager = testBuilder.offsetsRequestManager; +consumerNetworkThread =
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Parent: (was: KAFKA-14246) Issue Type: Task (was: Sub-task) > Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Description: The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: {code:java} @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); } {code} [~junrao] asks: {quote}Not sure if this is a useful test since {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly throw an exception? {quote} I commented out the {{doThrow}} line and it did not impact the test. was: The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: {code:java} @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); } {code} [~junrao] asks: {quote}Not sure if this is a useful test since {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly throw an exception? {quote} I commented out the doThrow line and it did not impact the test. > Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Description: The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: {code:java} @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); } {code} [~junrao] asks: {quote}Not sure if this is a useful test since {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly throw an exception? {quote} I commented out the doThrow line and it did not impact the test. was: The testResetPositionsProcessFailureIsIgnored test looks like this: {code:java} @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); } {code} I commented out the doThrow Not sure if this is a useful test since {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly throw an exception? > Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the doThrow line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Description: The testResetPositionsProcessFailureIsIgnored test looks like this: {code:java} @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); } {code} I commented out the doThrow Not sure if this is a useful test since {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly throw an exception? was: Regarding this comment in {{{}testPollResultTimer{}}}... {code:java} // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success| {code} [~junrao] asked: {quote}Which call is returning Long.MAX_VALUE? {quote} > Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The testResetPositionsProcessFailureIsIgnored test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > I commented out the doThrow > > Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
Kirk True created KAFKA-15639: - Summary: Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored Key: KAFKA-15639 URL: https://issues.apache.org/jira/browse/KAFKA-15639 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Philip Nee Regarding this comment in {{{}testPollResultTimer{}}}... {code:java} // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success| {code} [~junrao] asked: {quote}Which call is returning Long.MAX_VALUE? {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364700996 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchEvent; +import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ConsumerNetworkThreadTest { + +private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; +private Time time; +private ConsumerMetadata metadata; +private NetworkClientDelegate networkClient; +private BlockingQueue applicationEventsQueue; +private ApplicationEventProcessor applicationEventProcessor; +private CoordinatorRequestManager coordinatorManager; +private OffsetsRequestManager offsetsRequestManager; +private CommitRequestManager commitManager; +private ConsumerNetworkThread consumerNetworkThread; +private MockClient client; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); +time = testBuilder.time; +metadata = testBuilder.metadata; +networkClient = testBuilder.networkClientDelegate; +client = testBuilder.client; +applicationEventsQueue = testBuilder.applicationEventQueue; +applicationEventProcessor = testBuilder.applicationEventProcessor; +coordinatorManager = testBuilder.coordinatorRequestManager; +commitManager = testBuilder.commitRequestManager; +offsetsRequestManager = testBuilder.offsetsRequestManager; +consumerNetworkThread =
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Description: Regarding this comment in {{{}testPollResultTimer{}}}... {code:java} // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success| {code} [~junrao] asked: {quote}Which call is returning Long.MAX_VALUE? {quote} was: Thanks for the reply. I still don't quite understand the test. Why do we duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? {code:java} networkClientDelegate.disconnectAsync(readReplica); networkClientDelegate.poll(time.timer(0)); {code} MockClient is only woken up through {{{}networkClientDelegate.disconnectAsync{}}}. > Investigate ConsumerNetworkThread's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15638: - Assignee: Philip Nee (was: Kirk True) > Investigate ConsumerNetworkThread's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer
Kirk True created KAFKA-15638: - Summary: Investigate ConsumerNetworkThread's testPollResultTimer Key: KAFKA-15638 URL: https://issues.apache.org/jira/browse/KAFKA-15638 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Thanks for the reply. I still don't quite understand the test. Why do we duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? {code:java} networkClientDelegate.disconnectAsync(readReplica); networkClientDelegate.poll(time.timer(0)); {code} MockClient is only woken up through {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1364699488 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Summary: Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded (was: CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Description: Thanks for the reply. I still don't quite understand the test. Why do we duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? {code:java} networkClientDelegate.disconnectAsync(readReplica); networkClientDelegate.poll(time.timer(0)); {code} MockClient is only woken up through {{{}networkClientDelegate.disconnectAsync{}}}. was: Thanks for the reply. I still don't quite understand the test. Why do we duplicate the following code both inside and outside of setWakeupHook? {code:java} networkClientDelegate.disconnectAsync(readReplica); networkClientDelegate.poll(time.timer(0)); {code} MockClient is only woken up through {{{}networkClientDelegate.disconnectAsync{}}}. > CLONE - Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15637) CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
Kirk True created KAFKA-15637: - Summary: CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded Key: KAFKA-15637 URL: https://issues.apache.org/jira/browse/KAFKA-15637 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)