[GitHub] [kafka] gharris1727 commented on pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery
gharris1727 commented on PR #13467: URL: https://github.com/apache/kafka/pull/13467#issuecomment-1574679783 @dajac Thanks for pointing this out. This PR is certainly the cause, and it appears to be some tests which didn't get updated to compensate for this new behavior. I've opened a quick fix here: https://github.com/apache/kafka/pull/13805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13805: KAFKA-14863: Fix failing tests for invalid plugins that are no longer visible
gharris1727 opened a new pull request, #13805: URL: https://github.com/apache/kafka/pull/13805 1. DelegatingClassLoaderTest asserts that all of the "includeByDefault" plugins are visible, so update TestPlugins' AlwaysThrowException to not be included by default. 2. ConnectorPluginsResourceTest referenced the abstract TimestampConverter instead of the concrete TimestampConverter.Key or .Value ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery
dajac commented on PR #13467: URL: https://github.com/apache/kafka/pull/13467#issuecomment-1574652673 @gharris1727 @C0urante Recent builds have failure related to plugins loading. Example: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13804/1/tests. Could they be related to this PR? It seems that the last build of this PR also have them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1215084272 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); if (pendingTransition != null) { pendingTransition.result.fail(exception); } lastError = null; } -public synchronized void maybeTransitionToErrorState(RuntimeException exception) { +public synchronized void maybeTransitionToErrorState(RuntimeException exception, Review Comment: Correct. However, I went ahead and made the parameter explicit to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled
Hao Li created KAFKA-15054: -- Summary: Add configs and logic to decide if rack aware assignment should be enabled Key: KAFKA-15054 URL: https://issues.apache.org/jira/browse/KAFKA-15054 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
[ https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bo Gao updated KAFKA-15053: --- Description: [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue introduced validations on multiple configs. As a consequence, config {{security.protocol}} now only allows upper case values such as PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like sasl_ssl, ssl are also supported, there's even a case insensitive logic inside [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] to handle the lower case values. I think we should treat this as a regression bug since we don't support lower case values anymore since 3.3.0. For versions later than 3.3.0, we are getting error like this when using lower case value sasl_ssl {{Invalid value sasl_ssl for configuration security.protocol: String must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} was: This Jira issue introduced validations on multiple configs. As a consequence, config {{security.protocol}} now only allows upper case values such as PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like sasl_ssl, ssl are also supported, there's even a case insensitive logic inside [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] to handle the lower case values. I think we should treat this as a regression bug since we don't support lower case values anymore since 3.3.0. For versions later than 3.3.0, we are getting error like this when using lower case value sasl_ssl {{Invalid value sasl_ssl for configuration security.protocol: String must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} > Regression for security.protocol validation starting from 3.3.0 > --- > > Key: KAFKA-15053 > URL: https://issues.apache.org/jira/browse/KAFKA-15053 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Bo Gao >Priority: Major > > [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue > introduced validations on multiple configs. As a consequence, config > {{security.protocol}} now only allows upper case values such as PLAINTEXT, > SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like > sasl_ssl, ssl are also supported, there's even a case insensitive logic > inside > [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] > to handle the lower case values. > I think we should treat this as a regression bug since we don't support lower > case values anymore since 3.3.0. For versions later than 3.3.0, we are > getting error like this when using lower case value sasl_ssl > {{Invalid value sasl_ssl for configuration security.protocol: String must be > one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
Bo Gao created KAFKA-15053: -- Summary: Regression for security.protocol validation starting from 3.3.0 Key: KAFKA-15053 URL: https://issues.apache.org/jira/browse/KAFKA-15053 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.3.0 Reporter: Bo Gao This Jira issue introduced validations on multiple configs. As a consequence, config {{security.protocol}} now only allows upper case values such as PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like sasl_ssl, ssl are also supported, there's even a case insensitive logic inside [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] to handle the lower case values. I think we should treat this as a regression bug since we don't support lower case values anymore since 3.3.0. For versions later than 3.3.0, we are getting error like this when using lower case value sasl_ssl {{Invalid value sasl_ssl for configuration security.protocol: String must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1214957871 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,959 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework expose an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1214930728 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,959 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework expose an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new
[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13804: KAFKA-15052 Fix the flaky QuorumControllerTest.testBalancePartitionLeaders
dimitarndimitrov opened a new pull request, #13804: URL: https://github.com/apache/kafka/pull/13804 In this test broker session timeout is configured aggressively low (to 1 second) so that fencing can happen without much waiting. Then in the final portion of the test when brokers should not be fenced heartbeats are sent roughly 2 times in a session timeout window. However the first time that's done there's other code between sending the heartbeat and taking the timestamp, and in local tests that code can take up to 0.5 seconds (1/2 of the session timeout). That then can result in all brokers being fenced again which would fail the test. This change sends a heartbeat just when a timestamp is taken, which in local tests results flaky failures from 4 out of 50 to 0 out of 50. - In local tests increasing the session timeout from 1 second to 2 seconds also reduced the flaky failures to 0 out of 50 but also consistently increased the test running time with 1 second (which seems expected). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728878#comment-17728878 ] Justine Olshan commented on KAFKA-15052: Thanks for filing this. I've been seeing it often. > Fix flaky test QuorumControllerTest.testBalancePartitionLeaders() > - > > Key: KAFKA-15052 > URL: https://issues.apache.org/jira/browse/KAFKA-15052 > Project: Kafka > Issue Type: Test >Reporter: Dimitar Dimitrov >Assignee: Dimitar Dimitrov >Priority: Major > > Test failed at > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] > as well as in various local runs. > The test creates a topic, fences a broker, notes partition imbalance due to > another broker taking over the partition the fenced broker lost, re-registers > and unfences the fenced broker, sends {{AlterPartition}} for the lost > partition adding the now unfenced broker back to its ISR, then waits for the > partition imbalance to disappear. > The local failures seem to happen when the brokers (including the ones that > never get fenced by the test) accidentally get fenced by losing their session > due to reaching the (aggressively low for test purposes) session timeout. > The Cloudbees failure quoted above also seems to indicate that this happened: > {code:java} > ...[truncated 738209 chars]... > 23. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write > event for maybeBalancePartitionLeaders because scheduled (DEFERRED), > checkIntervalNs (OptionalLong[10]) and isImbalanced (true) > (org.apache.kafka.controller.QuorumController:1401) > [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-0, foo-1, foo-2 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for > foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: > 3 -> 4, partitionEpoch: 4 -> 5 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, > appendTimestamp=240, > records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, > topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, > brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), > prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory > snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) > [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:512) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation > maybeFenceReplicas(451616131) will be completed when the log reaches offset > 27. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-1 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1,
[jira] [Updated] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dimitar Dimitrov updated KAFKA-15052: - Description: Test failed at [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] as well as in various local runs. The test creates a topic, fences a broker, notes partition imbalance due to another broker taking over the partition the fenced broker lost, re-registers and unfences the fenced broker, sends {{AlterPartition}} for the lost partition adding the now unfenced broker back to its ISR, then waits for the partition imbalance to disappear. The local failures seem to happen when the brokers (including the ones that never get fenced by the test) accidentally get fenced by losing their session due to reaching the (aggressively low for test purposes) session timeout. The Cloudbees failure quoted above also seems to indicate that this happened: {code:java} ...[truncated 738209 chars]... 23. (org.apache.kafka.controller.QuorumController:768) [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write event for maybeBalancePartitionLeaders because scheduled (DEFERRED), checkIntervalNs (OptionalLong[10]) and isImbalanced (true) (org.apache.kafka.controller.QuorumController:1401) [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 because its session has timed out. (org.apache.kafka.controller.ReplicationControlManager:1459) [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: changing partition(s): foo-0, foo-1, foo-2 (org.apache.kafka.controller.ReplicationControlManager:1750) [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 3, partitionEpoch: 2 -> 3 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 3 -> 4, partitionEpoch: 4 -> 5 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 3, partitionEpoch: 2 -> 3 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, appendTimestamp=240, records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log check. (org.apache.kafka.metalog.LocalLogManager:512) [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation maybeFenceReplicas(451616131) will be completed when the log reaches offset 27. (org.apache.kafka.controller.QuorumController:768) [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 because its session has timed out. (org.apache.kafka.controller.ReplicationControlManager:1459) [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: changing partition(s): foo-1 (org.apache.kafka.controller.ReplicationControlManager:1750) [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 -> 5, partitionEpoch: 5 -> 6 (org.apache.kafka.controller.ReplicationControlManager:157){code} was: Test failed at [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] as well as in various local runs. The test creates a topic, fences a broker, notes partition imbalance due to another broker taking over the partition the fenced broker lost, re-registers and unfences the fenced broker, sends `AlterPartition` for the lost partition adding the now unfenced broker back to its ISR, then waits for the partition imbalance to disappear. The local
[jira] [Created] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
Dimitar Dimitrov created KAFKA-15052: Summary: Fix flaky test QuorumControllerTest.testBalancePartitionLeaders() Key: KAFKA-15052 URL: https://issues.apache.org/jira/browse/KAFKA-15052 Project: Kafka Issue Type: Test Reporter: Dimitar Dimitrov Assignee: Dimitar Dimitrov Test failed at [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] as well as in various local runs. The test creates a topic, fences a broker, notes partition imbalance due to another broker taking over the partition the fenced broker lost, re-registers and unfences the fenced broker, sends `AlterPartition` for the lost partition adding the now unfenced broker back to its ISR, then waits for the partition imbalance to disappear. The local failures seem to happen when the brokers (including the ones that never get fenced by the test) accidentally get fenced by losing their session due to reaching the (aggressively low for test purposes) session timeout. The Cloudbees failure quoted above also seems to indicate that this happened: {code:java} ...[truncated 738209 chars]... 23. (org.apache.kafka.controller.QuorumController:768) [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write event for maybeBalancePartitionLeaders because scheduled (DEFERRED), checkIntervalNs (OptionalLong[10]) and isImbalanced (true) (org.apache.kafka.controller.QuorumController:1401) [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 because its session has timed out. (org.apache.kafka.controller.ReplicationControlManager:1459) [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: changing partition(s): foo-0, foo-1, foo-2 (org.apache.kafka.controller.ReplicationControlManager:1750) [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 3, partitionEpoch: 2 -> 3 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 3 -> 4, partitionEpoch: 4 -> 5 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 3, partitionEpoch: 2 -> 3 (org.apache.kafka.controller.ReplicationControlManager:157) [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, appendTimestamp=240, records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log check. (org.apache.kafka.metalog.LocalLogManager:512) [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation maybeFenceReplicas(451616131) will be completed when the log reaches offset 27. (org.apache.kafka.controller.QuorumController:768) [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 because its session has timed out. (org.apache.kafka.controller.ReplicationControlManager:1459) [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: changing partition(s): foo-1 (org.apache.kafka.controller.ReplicationControlManager:1750) [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 -> 5, partitionEpoch: 5 -> 6 (org.apache.kafka.controller.ReplicationControlManager:157){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jeffkbkim commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214829515 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.ReplicaManager +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( +tp: TopicPartition, +offset: Long + ): Unit = { +listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { +case other: ListenerAdaptor => listener.equals(other.listener) +case _ => false + } + + override def hashCode(): Int = { +listener.hashCode() + } + + override def toString: String = { +s"ListenerAdaptor(listener=$listener)" + } +} + +class CoordinatorPartitionWriter[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + /** + * Register a PartitionWriter.Listener. + * + * @param tp The partition to register the listener to. + * @param listener The listener. + */ + override def registerListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + /** + * Deregister a PartitionWriter.Listener. + * + * @param tp The partition to deregister the listener from. + * @param listener The listener. + */ + override def deregisterListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def append( +tp: TopicPartition, +records: util.List[T] + ): Long = { +if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") + +replicaManager.getLogConfig(tp) match { + case Some(logConfig) => +val magic = logConfig.recordVersion.value +val maxBatchSize = logConfig.maxMessageSize +val currentTimeMs = time.milliseconds() + +val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize +) + +records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( +currentTimeMs, +keyBytes, +valueBytes, +EMPTY_HEADERS Review Comment: do we not need anything in the header because these are only internal topics? what metadata do other record types contain in the header, client info? ##
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1214835504 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,959 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework expose an asynchronous, future based, API to the world. All the operations Review Comment: nit: exposes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214823173 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; Review Comment: ah yeah, that makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214821877 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java: ## @@ -38,13 +35,12 @@ public RocksDBTimeOrderedKeyValueSegmentedBytesStore get() { name, metricsScope(), retentionPeriod, -Math.max(retentionPeriod / 2, 60_000L), -withIndex +Math.max(retentionPeriod / 2, 60_000L) ); } public String metricsScope() { -return "rocksdb-session"; +return "rocksdb-buffer"; Review Comment: Sure that is fine with me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
dajac commented on code in PR #13765: URL: https://github.com/apache/kafka/pull/13765#discussion_r1214750188 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1091,8 +1091,10 @@ class Partition(val topicPartition: TopicPartition, // Note here we are using the "maximal", see explanation above val replicaState = replica.stateSnapshot if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && -(replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) - || partitionState.maximalIsr.contains(replica.brokerId))) { + ((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) && +isReplicaIsrEligible(replica.brokerId)) || Review Comment: Is it worth extracting this condition into an helper method (e.g. isIsrEligibleAndCaughtUp)? That would simplify the condition. ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionListener.failures.get, 1) } + @ParameterizedTest + @ValueSource(strings = Array("fenced", "shutdown", "unfenced")) + def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = { Review Comment: nit: s/HWM/HighWatermark? ## core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala: ## @@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = { Review Comment: nit: `*ShouldNotTimeout`? it would be great to capture the issue in the test name or to add a comment about it. ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -260,11 +265,17 @@ private void tryElection(PartitionChangeRecord record) { * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of * case 1. In this function, we check for cases 2 and 3, and handle them by manually * setting record.leader to the current leader. + * + * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager + * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader + * bump is not required when the ISR shrinks. */ void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) { Review Comment: For my understanding, do we bump the leader epoch when the ISR is expanded? My understanding is that we don't. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values
C0urante commented on code in PR #13433: URL: https://github.com/apache/kafka/pull/13433#discussion_r1214775964 ## connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java: ## @@ -36,6 +38,255 @@ public class SchemaBuilderTest { private static final String DOC = "doc"; private static final Map NO_PARAMS = null; +@Test +public void testDefaultValueStructSchema() { +SchemaBuilder builder = SchemaBuilder.struct() +.field("f1", Schema.BOOLEAN_SCHEMA); + +Struct defaultValue = new Struct(builder.build()); // the Struct receives a schema, not a builder +defaultValue.put("f1", true); + +builder.defaultValue(defaultValue) +.build(); +} + +@Test +public void testDefaultValueStructSchemaBuilder() { +SchemaBuilder builder = SchemaBuilder.struct() +.field("f1", Schema.BOOLEAN_SCHEMA); + +Struct defaultValue = new Struct(builder); +defaultValue.put("f1", true); + +builder.defaultValue(defaultValue).build(); Review Comment: > I think we should stick to one way of setting up struct defaults, so the end result of building a schema can be consistent The more I think of it, the more I agree with this. The inconsistencies aren't great, and allowing a default value to be used even though its schema is not equal to the schema it's defined on seems like a great chance for a footgun that burns people later. I think we should revert the changes in this PR that cause the `SchemaBuilder::testDefaultValueStructSchema` test case to pass and actively assert that that test cases throws an exception instead. We can also add a note to the `SchemaBuilder::defaultValue` Javadocs on how to use `Struct` instances as default values. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors
cmccabe merged PR #13799: URL: https://github.com/apache/kafka/pull/13799 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
C0urante commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1574216434 @divijvaidya No worries about the delay--looks like we're both guilty on that front I took at look at this and the issue is that the `MirrorCheckpointConnector` and `MirrorSourceConnector` instances are generating empty lists of task configs, which makes since since the former never generates tasks during these tests and the latter only generates tasks once a to-be-mirrored topic on the upstream cluster is created, which currently takes place only after our startup check has finished. Correct me if I'm wrong: the idea here is to add granularity to test failure messages so that we can differentiate between MM2 startup issues (caused by lagginess or connector/task failures) and issues with the actual replication logic performed by MM2. If that's the case, do you think it might make sense to do the following? - Alter the"await startup" logic to fail immediately if it detects any failed connectors or tasks - Alter the "await startup" logic to operate on a per-connector-type basis, and selectively await the startup of different connector types at different points in the test (e.g., we could await the startup of the heartbeat connector in any place where we currently call `waitForMirrorMakersToStart`, but only await the startup of the source connector and its tasks once we've created a to-be-replicated topic on the upstream cluster) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and CoordinatorPlayback interfaces
dajac merged PR #13794: URL: https://github.com/apache/kafka/pull/13794 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
C0urante commented on code in PR #13771: URL: https://github.com/apache/kafka/pull/13771#discussion_r1214691525 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } +@Test +public void testNonCollidingAliases() { +SortedSet> sinkConnectors = new TreeSet<>(); +sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); +SortedSet> sourceConnectors = new TreeSet<>(); +sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); +SortedSet> converters = new TreeSet<>(); +converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); +PluginScanResult result = new PluginScanResult( +sinkConnectors, +sourceConnectors, +converters, +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet() +); +Map aliases = PluginUtils.computeAliases(result); +assertFalse(aliases.containsKey("Mock")); +assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); +assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); +assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); +assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); +assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); +assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); +} + + +@Test +public void testMultiVersionAlias() { +SortedSet> sinkConnectors = new TreeSet<>(); +// distinct versions don't cause an alias collision (the class name is the same) +sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); +sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); +assertEquals(2, sinkConnectors.size()); +PluginScanResult result = new PluginScanResult( +sinkConnectors, +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet() +); +Map aliases = PluginUtils.computeAliases(result); +assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); +assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); +} + +@Test +public void testCollidingPrunedAlias() { +SortedSet> converters = new TreeSet<>(); +converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); +SortedSet> headerConverters = new TreeSet<>(); +headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); +PluginScanResult result = new PluginScanResult( +Collections.emptySortedSet(), +Collections.emptySortedSet(), +converters, +headerConverters, +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet(), +Collections.emptySortedSet() +); +Map aliases = PluginUtils.computeAliases(result); +assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); +assertEquals(CollidingHeaderConverter.class.getName(), aliases.get("CollidingHeaderConverter")); +assertFalse(aliases.containsKey("Colliding")); Review Comment: Same thought RE comparing whole `Map` objects: ```suggestion Map actualAliases = PluginUtils.computeAliases(result); Map expectedAliases = new HashMap<>(); expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); expectedAliases.put("CollidingHeaderConverter", CollidingHeaderConverter.class.getName()); assertEquals(expectedAliases, actualAliases); ``` ##
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1214665544 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -832,202 +776,165 @@ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throw // Note that we stub these to allow any number of calls because the thread will continue to // run. The count passed in + latch returned just makes sure we get *at least* that number of // calls -EasyMock.expect(sourceTask.poll()) -.andStubAnswer(() -> { -count.incrementAndGet(); -latch.countDown(); -Thread.sleep(10); -return RECORDS; -}); +doAnswer((Answer>) invocation -> { +count.incrementAndGet(); +latch.countDown(); +Thread.sleep(10); +return RECORDS; +}).when(sourceTask).poll(); + // Fallout of the poll() call -expectSendRecordAnyTimes(); +expectSendRecord(); return latch; } private CountDownLatch expectPolls(int count) throws InterruptedException { return expectPolls(count, new AtomicInteger()); } -@SuppressWarnings("unchecked") -private void expectSendRecordSyncFailure(Throwable error) { -expectConvertHeadersAndKeyValue(false); -expectApplyTransformationChain(false); - -EasyMock.expect( -producer.send(EasyMock.anyObject(ProducerRecord.class), - EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) -.andThrow(error); +private void expectSendRecord() throws InterruptedException { +expectSendRecordTaskCommitRecordSucceed(); } -private Capture> expectSendRecordAnyTimes() throws InterruptedException { -return expectSendRecordTaskCommitRecordSucceed(true); +// +private void expectSendRecordProducerCallbackFail() throws InterruptedException { +expectSendRecord(TOPIC, false, false, true, emptyHeaders()); } -private Capture> expectSendRecordOnce() throws InterruptedException { -return expectSendRecordTaskCommitRecordSucceed(false); +private void expectSendRecordTaskCommitRecordSucceed() throws InterruptedException { +expectSendRecord(TOPIC, true, true, true, emptyHeaders()); } -private Capture> expectSendRecordProducerCallbackFail() throws InterruptedException { -return expectSendRecord(TOPIC, false, false, false, true, emptyHeaders()); -} - -private Capture> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws InterruptedException { -return expectSendRecord(TOPIC, anyTimes, true, true, true, emptyHeaders()); -} - -private Capture> expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws InterruptedException { -return expectSendRecord(TOPIC, anyTimes, true, false, true, emptyHeaders()); -} - -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -boolean sendSuccess, -boolean commitSuccess, -boolean isMockedConverters, -Headers headers +private void expectSendRecord( +String topic, +boolean sendSuccess, +boolean commitSuccess, +boolean isMockedConverters, +Headers headers ) throws InterruptedException { if (isMockedConverters) { -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); +expectConvertHeadersAndKeyValue(topic, headers); } -expectApplyTransformationChain(anyTimes); - -Capture> sent = EasyMock.newCapture(); - -// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), -EasyMock.capture(producerCallbacks))); -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { -if (sendSuccess) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, -0L, 0, 0), null); -} else { -cb.onCompletion(null, new TopicAuthorizationException("foo")); -} -} -producerCallbacks.reset(); -} -return sendFuture; -}; -if (anyTimes) -expect.andStubAnswer(expectResponse); -else -expect.andAnswer(expectResponse); +expectApplyTransformationChain(); if (sendSuccess) { // 2. As a result of a successful
[GitHub] [kafka] cmccabe commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors
cmccabe commented on code in PR #13799: URL: https://github.com/apache/kafka/pull/13799#discussion_r1214678682 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2024,13 +2029,8 @@ public CompletableFuture electLeaders( public CompletableFuture finalizedFeatures( ControllerRequestContext context ) { -// It's possible that we call ApiVersionRequest before consuming the log since ApiVersionRequest is sent when -// initialize NetworkClient, we should not return an error since it would stop the NetworkClient from working correctly. -if (lastCommittedOffset == -1) { Review Comment: yes, this was previously a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors
cmccabe commented on code in PR #13799: URL: https://github.com/apache/kafka/pull/13799#discussion_r1214677896 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -425,25 +430,24 @@ public void accept(ConfigResource configResource) { public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler"; -private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX = -"The active controller appears to be node "; - -private NotControllerException newNotControllerException() { -OptionalInt latestController = raftClient.leaderAndEpoch().leaderId(); -if (latestController.isPresent()) { -return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + -latestController.getAsInt() + "."); -} else { -return new NotControllerException("No controller appears to be active."); -} +private OptionalInt latestController() { +return raftClient.leaderAndEpoch().leaderId(); } -private NotControllerException newPreMigrationException() { -OptionalInt latestController = raftClient.leaderAndEpoch().leaderId(); -if (latestController.isPresent()) { -return new NotControllerException("The controller is in pre-migration mode."); +/** + * @return The offset that we should perform read operations at. + */ +private long currentReadOffset() { +if (isActiveController()) { +// The active controller keeps an in-memory snapshot at the last committed offset, +// which we want to read from when performing read operations. This will avoid +// reading uncommitted data. +return lastCommittedOffset; } else { -return new NotControllerException("No controller appears to be active."); +// Standby controllers never have uncommitted data in memory. Therefore, we return +// Long.MAX_VALUE, a special value which means "always read the latest from every +// data structure." +return Long.MAX_VALUE; Review Comment: Good point. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214669497 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: > unfortunately with how the evictions work that just pushes the complexity into the in memory implementation Which in memory implementation was causing trouble? Currently `BufferValue.serialize()` is serializing three extra integers into the value which are all unnecessary if this store use case will never populate values for either `oldValue` or `priorValue`. Might not be a lot in some use cases but for small message sizes the overhead could be meaningful. That said, we can always address this as a follow-up PR but I am curious to know where the challenge lies, since I think it'd be nice to serialize the value directly instead of going through `Change` serializers if we can avoid it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214669497 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: > unfortunately with how the evictions work that just pushes the complexity into the in memory implementation Which in memory implementation was causing trouble? Currently `BufferValue.serialize()` is serializing three extra integers into the value which are all unnecessary if this store use case will never populate values for either `oldValue` or `priorValue`. Might not be a lot in some use cases but for small message sizes the overhead could be meaningful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r121442 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -34,10 +34,9 @@ public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends AbstractRocks RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name, final String metricsScope, final long retention, - final long segmentInterval, - final boolean withIndex) { + final long segmentInterval) { super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), -Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); +Optional.ofNullable(null)); Review Comment: nit: `Optional.empty()` ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -78,18 +78,21 @@ public void init(final StateStoreContext context, final StateStore root) { @Override public void evictWhile(final Supplier predicate, final Consumer> callback) { -KeyValue keyValue = null; +KeyValue keyValue; if (predicate.get()) { final KeyValueIterator iterator = wrapped() .fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); if (iterator.hasNext()) { keyValue = iterator.next(); -} -if (keyValue == null) { -minTimestamp = Long.MAX_VALUE; +} else { +if (numRecords() == 0) { +minTimestamp = Long.MAX_VALUE; +} +iterator.close(); Review Comment: Use try-with-resource block instead, so we don't need to remember to close the iterator in all possible places before returning? Also so that if an exception occurs, the iterator is still not leaked. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java: ## @@ -38,13 +35,12 @@ public RocksDBTimeOrderedKeyValueSegmentedBytesStore get() { name, metricsScope(), retentionPeriod, -Math.max(retentionPeriod / 2, 60_000L), -withIndex +Math.max(retentionPeriod / 2, 60_000L) ); } public String metricsScope() { -return "rocksdb-session"; +return "rocksdb-buffer"; Review Comment: Do we want this store to specifically be used as a buffer, or should it support non-buffer use cases for RocksDB time-ordered stores as well? If we want it to be more extensible in the future, we can simply use `rocksdb` here (similar to `RocksDbKeyValueBytesStoreSupplier`). We'd also want to remove the `-buffer` suffix from the store name too. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1214664368 ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class represents the verification state of a specific producer-id. + * It contains a verificationState object that is used to uniquely identify the transaction we want to verify. + * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction + * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions. + * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction. + * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism. + */ +public class VerificationStateEntry { + +private final long producerId; +private long timestamp; +private Object verificationState; Review Comment: I've changed to verificationGuard -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1214654703 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -391,37 +369,26 @@ public void testFailureInPoll() throws Exception { taskFuture.get(); assertPollMetrics(0); -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyOffsetFlush(true); +verify(statusListener).onFailure(taskId, exception); +verify(sourceTask).stop(); Review Comment: Ah, never mind. This is covered by changes to `verifyOffsetFlush`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1214602836 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -315,157 +322,130 @@ public void testPause() throws Exception { taskFuture.get(); -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyTaskGetTopic(count.get()); +verifyOffsetFlush(true); +verifyTopicCreation(TOPIC); +verify(statusListener).onPause(taskId); +verify(statusListener).onShutdown(taskId); +verify(sourceTask).stop(); +verify(offsetWriter).offset(PARTITION, OFFSET); +verifyClose(); } @Test public void testPollsInBackground() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = expectPolls(10); -// In this test, we don't flush, so nothing goes any further than the offset writer expectTopicCreation(TOPIC); - -sourceTask.stop(); -EasyMock.expectLastCall(); - -offsetWriter.offset(PARTITION, OFFSET); -PowerMock.expectLastCall(); -expectOffsetFlush(true); - -statusListener.onShutdown(taskId); -EasyMock.expectLastCall(); - -expectClose(); - -PowerMock.replayAll(); +expectOffsetFlush(); workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); -assertTrue(awaitLatch(pollLatch)); +ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); taskFuture.get(); assertPollMetrics(10); - -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyOffsetFlush(true); +verify(offsetWriter).offset(PARTITION, OFFSET); +verify(statusListener).onShutdown(taskId); +verifyClose(); } @Test public void testFailureInPoll() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = new CountDownLatch(1); final RuntimeException exception = new RuntimeException(); -EasyMock.expect(sourceTask.poll()).andAnswer(() -> { +when(sourceTask.poll()).thenAnswer(invocation -> { pollLatch.countDown(); throw exception; }); -statusListener.onFailure(taskId, exception); -EasyMock.expectLastCall(); - -sourceTask.stop(); -EasyMock.expectLastCall(); expectEmptyOffsetFlush(); -expectClose(); - -PowerMock.replayAll(); - workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); -assertTrue(awaitLatch(pollLatch)); +ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG); //Failure in poll should trigger automatic stop of the task assertTrue(workerTask.awaitStop(1000)); -assertShouldSkipCommit(); taskFuture.get(); assertPollMetrics(0); -PowerMock.verifyAll(); +verifyCleanStartup(); +verify(statusListener).onFailure(taskId, exception); +verify(sourceTask).stop(); +assertShouldSkipCommit(); +verifyOffsetFlush(true); +verifyClose(); } @Test public void testFailureInPollAfterCancel() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = new CountDownLatch(1); final CountDownLatch workerCancelLatch = new CountDownLatch(1); final RuntimeException exception = new RuntimeException(); -EasyMock.expect(sourceTask.poll()).andAnswer(() -> { +when(sourceTask.poll()).thenAnswer(invocation -> { pollLatch.countDown(); -assertTrue(awaitLatch(workerCancelLatch)); +ConcurrencyUtils.awaitLatch(workerCancelLatch, POLL_TIMEOUT_MSG); throw exception; }); -offsetReader.close(); -PowerMock.expectLastCall(); - -producer.close(Duration.ZERO); -PowerMock.expectLastCall(); - -sourceTask.stop(); -EasyMock.expectLastCall(); - -expectClose(); - -PowerMock.replayAll(); - workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); -assertTrue(awaitLatch(pollLatch)); +ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG); workerTask.cancel(); workerCancelLatch.countDown(); assertTrue(workerTask.awaitStop(1000)); taskFuture.get(); assertPollMetrics(0); -PowerMock.verifyAll(); +verifyCleanStartup(); +verify(offsetReader, atLeastOnce()).close(); +verify(producer).close(Duration.ZERO); +verify(sourceTask).stop(); +
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1214639262 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig, val sTime = time.milliseconds val transactionalProducerIds = mutable.HashSet[Long]() + var verificationState: Object = null Review Comment: I've fixed this and added a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
gharris1727 commented on PR #13771: URL: https://github.com/apache/kafka/pull/13771#issuecomment-1574069538 Here's an example of the new log message, pulled from the test run: ``` [2023-06-02 10:15:57,932] WARN Ambiguous alias 'Colliding' refers to multiple distinct plugins [org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter, org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter]: ignoring (org.apache.kafka.connect.runtime.isolation.PluginUtils:378) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request, #13803: KAFKA-15051: add missing GET plugin/config endpoint
jeqo opened a new pull request, #13803: URL: https://github.com/apache/kafka/pull/13803 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation
Jorge Esteban Quilcate Otoya created KAFKA-15051: Summary: docs: add missing connector plugin endpoint to documentation Key: KAFKA-15051 URL: https://issues.apache.org/jira/browse/KAFKA-15051 Project: Kafka Issue Type: Task Components: docs, documentation Reporter: Jorge Esteban Quilcate Otoya GET /plugin/config endpoint added in [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions] is not included in the connect documentation page: https://kafka.apache.org/documentation/#connect_rest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
divijvaidya commented on code in PR #13760: URL: https://github.com/apache/kafka/pull/13760#discussion_r1214582937 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.admin.DeletedRecords; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.DeleteRecordsRequestData; +import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteRecordsRequest; +import org.apache.kafka.common.requests.DeleteRecordsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class DeleteRecordsHandler extends Batched { + +private final Map recordsToDelete; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public DeleteRecordsHandler( +Map recordsToDelete, +LogContext logContext +) { +this.recordsToDelete = recordsToDelete; +this.log = logContext.logger(DeleteRecordsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "deleteRecords"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +public static SimpleAdminApiFuture newFuture( +Collection topicPartitions +) { +return AdminApiFuture.forKeys(new HashSet<>(topicPartitions)); +} + +@Override +public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map deletionsForTopic = new HashMap<>(); +for (Map.Entry entry: recordsToDelete.entrySet()) { +TopicPartition topicPartition = entry.getKey(); +DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = deletionsForTopic.get(topicPartition.topic()); +if (deleteRecords == null) { +deleteRecords = new DeleteRecordsRequestData.DeleteRecordsTopic() +.setName(topicPartition.topic()); +deletionsForTopic.put(topicPartition.topic(), deleteRecords); +} +deleteRecords.partitions().add(new DeleteRecordsRequestData.DeleteRecordsPartition() +.setPartitionIndex(topicPartition.partition()) +.setOffset(entry.getValue().beforeOffset())); + +System.out.println("Partitions: " + deleteRecords.partitions()); +} + +DeleteRecordsRequestData data = new DeleteRecordsRequestData() +.setTopics(new ArrayList<>(deletionsForTopic.values())); +return new DeleteRecordsRequest.Builder(data); +} + + +@Override +public ApiResult handleResponse( +Node broker, +Set keys, +AbstractResponse abstractResponse +) { +DeleteRecordsResponse response = (DeleteRecordsResponse) abstractResponse; +Map completed = new HashMap<>(); +Map failed = new HashMap<>(); +List unmapped = new ArrayList<>(); +Set retriable = new HashSet<>(); + +for (DeleteRecordsResponseData.DeleteRecordsTopicResult topicResult: response.data().topics()) { +for (DeleteRecordsResponseData.DeleteRecordsPartitionResult partitionResult : topicResult.partitions()) { +Errors error =
[GitHub] [kafka] C0urante commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method
C0urante commented on code in PR #13750: URL: https://github.com/apache/kafka/pull/13750#discussion_r1214589790 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback callback) writeTaskConfigs(connName, Collections.emptyList()); configBackingStore.putTargetState(connName, TargetState.STOPPED); // Force a read of the new target state for the connector -refreshConfigSnapshot(workerSyncTimeoutMs); +if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { +throw new ConnectException("Failed to read to end of config topic"); Review Comment: Agreed, better to log a warning I was also toying with the idea that we could shift the responsibility of refreshing the view of the config topic from operations that write to the topic, to operations that read from the topic. For example, if we need to check that a connector exists before pausing/resuming/restarting it, then that operation fails if we can't update our view of the topic. I don't think this is necessary to do in this PR and can be a follow-up. It's also questionable if we should even bother since there's still race conditions that can come from target state updates that originate from non-leader workers. But it's at least worth some food for thought. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on PR #13675: URL: https://github.com/apache/kafka/pull/13675#issuecomment-1574019307 Yeah, I have noticed all the failed tests. I will fix them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574015844 The failures seem unrelated ``` Build / JDK 8 and Scala 2.12 / testDescribeReportOverriddenConfigs(String).quorum=kraft – kafka.admin.TopicCommandIntegrationTest 10s Build / JDK 8 and Scala 2.12 / testDelayedConfigurationOperations() – org.apache.kafka.controller.QuorumControllerTest <1s Build / JDK 11 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 1m 36s Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 13s Fixed 68 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on PR #13675: URL: https://github.com/apache/kafka/pull/13675#issuecomment-1574008136 Hey David -- whenever we update appendRecords we have to modify a ton of tests that mock the method. (Basically add a ton of any(), since the mocker doesn't get default args). There are 283 failing now See here for an example of a PR that updates them: https://github.com/apache/kafka/pull/13391/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214570874 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. Review Comment: lgtm :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214569080 ## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ## @@ -131,26 +132,22 @@ class PartitionWriterImpl[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } -val appendResults = replicaManager.appendToLocalLog( +var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty +replicaManager.appendRecords( + timeout = 0L, + requiredAcks = 1, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), - requiredAcks = 1, - requestLocal = RequestLocal.NoCaching + responseCallback = results => appendResults = results, + actionQueueAdd = item => item() // Immediately complete the action queue item. ) val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException("Append status %s should only have one partition %s" -.format(appendResults, tp))) - -// Complete delayed operations. -replicaManager.maybeCompletePurgatories( - tp, - partitionResult.info.leaderHwChange -) + throw new IllegalStateException(s"Append status $appendResults should only have one partition $tp")) Review Comment: nit: this error message confused me because I thought that it was saying there was more than one tp and that's why it failed. I think maybe the message should say something like s"Append status $appendResults should have partition $tp" since the fact that it doesn't would be the reason to throw the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors
mumrah commented on code in PR #13799: URL: https://github.com/apache/kafka/pull/13799#discussion_r1214563481 ## metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java: ## @@ -38,4 +47,104 @@ public static boolean isTimeoutException(Throwable exception) { if (!(exception instanceof TimeoutException)) return false; return true; } + +/** + * Check if an exception is a NotController exception. + * + * @param exception The exception to check. + * @return True if the exception is a NotController exception. + */ +public static boolean isNotControllerException(Throwable exception) { +if (exception == null) return false; +if (exception instanceof ExecutionException) { +exception = exception.getCause(); +if (exception == null) return false; +} +if (!(exception instanceof NotControllerException)) return false; +return true; +} + +/** + * Create a new exception indicating that the controller is in pre-migration mode, so the + * operation cannot be completed. + * + * @param latestController The current controller. + * @return The new NotControllerException. + */ +public static NotControllerException newPreMigrationException(OptionalInt latestController) { Review Comment: I'm guessing the argument here is the controller ID? Can you rename it to reflect that? (same for other methods in this class) ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2024,13 +2029,8 @@ public CompletableFuture electLeaders( public CompletableFuture finalizedFeatures( ControllerRequestContext context ) { -// It's possible that we call ApiVersionRequest before consuming the log since ApiVersionRequest is sent when -// initialize NetworkClient, we should not return an error since it would stop the NetworkClient from working correctly. -if (lastCommittedOffset == -1) { Review Comment: Was this a bug previously? If another node sent an ApiVersionRequest prior to loading the log, we would have not returned any MetadataVersion (whereas with this patch, we would). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
jsancio commented on PR #13765: URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573978281 > 1. We change leader epoch when a replica is removed and shrinks the ISR (without a leader-re-election). Is that correct? Is yes, then should we also removing the logic to increment the epoch in such situations to keep the definition of leader epoch consistent? Yes. That is what the change to `PartittionChangeBuilder` does. Can you point me to what code exactly are you referring to? > 2. Is a similar change required for Zk code path? We need to keep the old behavior in ZK deployments because the ZK controller doesn't implement KIP-841 which is required for this fix to work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214564305 ## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ReplicaManager, RequestLocal} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( +tp: TopicPartition, +offset: Long + ): Unit = { +listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { +case other: ListenerAdaptor => listener.equals(other.listener) +case _ => false + } + + override def hashCode(): Int = { +listener.hashCode() + } + + override def toString: String = { +s"ListenerAdaptor(listener=$listener)" + } +} + +class PartitionWriterImpl[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + override def registerListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + override def deregisterListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + override def append( +tp: TopicPartition, +records: util.List[T] + ): Long = { +if (records.isEmpty) { + throw new IllegalStateException("records must be non-empty.") +} + +replicaManager.getLogConfig(tp) match { + case Some(logConfig) => +val magic = logConfig.recordVersion.value +val maxBatchSize = logConfig.maxMessageSize +val currentTimeMs = time.milliseconds() + +val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize +) + +records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { +recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS +) + } else { +throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } +} + +val appendResults = replicaManager.appendToLocalLog( Review Comment: Sounds good. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode
C0urante commented on PR #13530: URL: https://github.com/apache/kafka/pull/13530#issuecomment-1573973942 Hmm... so I just did some local testing with this change and the results seem a little inconsistent. Although every expected case reported the error successfully (via the REST API if the `Connector::taskConfigs` method was called synchronously, and via worker logs no matter what), we also don't mark the connector failed in the REST API. Do you think it's worth it to mark connectors FAILED in the REST API if we've encountered an error while interacting with them, and will not (automatically) retry that interaction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561645 ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class represents the verification state of a specific producer-id. + * It contains a verificationState object that is used to uniquely identify the transaction we want to verify. + * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction + * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions. + * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction. + * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism. + */ +public class VerificationStateEntry { + +private final long producerId; Review Comment: We can remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561227 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -184,6 +186,27 @@ private void clearProducerIds() { producerIdCount = 0; } +/** + * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null. + */ +public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) { +return verificationStates.computeIfAbsent(producerId, pid -> { +if (createIfAbsent) +return new VerificationStateEntry(pid, time.milliseconds()); +else { +log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null"); Review Comment: Yes. But in this case, we will fail the verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah opened a new pull request, #13802: URL: https://github.com/apache/kafka/pull/13802 This patch adds complete test coverage for the snapshot methods in KRaftMigrationZkWriter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1214559856 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = null, Review Comment: Nope. We don't verify followers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
divijvaidya commented on PR #13561: URL: https://github.com/apache/kafka/pull/13561#issuecomment-1573968832 Hey @satishd Are you planning to address the open comments such as https://github.com/apache/kafka/pull/13561/files#r1166767890 before I do another pass of code review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
divijvaidya commented on PR #13765: URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573957094 > the old code was increasing the leader epoch when with the ISR shrinks but not when the ISR expands Thank you, I didn't realise that. Next, 1. We change leader epoch when a replica is removed and shrinks the ISR (without a leader-re-election). Is that correct? Is yes, then should we also removing the logic to increment the epoch in such situations to keep the definition of leader epoch consistent? 2. Is a similar change required for Zk code path? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #9624: KAFKA-10655: wrap and catch exception for appendAsLeader failure
abbccdda closed pull request #9624: KAFKA-10655: wrap and catch exception for appendAsLeader failure URL: https://github.com/apache/kafka/pull/9624 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-15012. --- Fix Version/s: 3.6.0 Resolution: Fixed > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading > zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric > value: Leading zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) > at > com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754) > at > com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247) > at > com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734) > at > org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at >
[GitHub] [kafka] C0urante merged pull request #13800: KAFKA-15012: Allow leading zeros in numeric fields while deserializing JSON messages using the JsonConverter
C0urante merged PR #13800: URL: https://github.com/apache/kafka/pull/13800 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
splett2 commented on PR #13765: URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573900120 @jsancio That makes sense. Sounds good to me in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14863) Plugins which do not have a valid no-args constructor are visible in the REST API
[ https://issues.apache.org/jira/browse/KAFKA-14863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14863. --- Fix Version/s: 3.6.0 Resolution: Fixed > Plugins which do not have a valid no-args constructor are visible in the REST > API > - > > Key: KAFKA-14863 > URL: https://issues.apache.org/jira/browse/KAFKA-14863 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.6.0 > > > Currently, the Connect plugin discovery mechanisms only assert that a no-args > constructor is present when necessary. In particular, this assertion happens > for Connectors when the framework needs to evaluate the connector's version > method. > It also happens for ConnectorConfigOverridePolicy, ConnectRestExtension, and > ConfigProvider plugins, which are loaded via the ServiceLoader. The > ServiceLoader constructs instances of plugins with their no-args constructor > during discovery, so these plugins are discovered even if they are not > Versioned. > This has the effect that these unusable plugins which are missing a default > constructor appear in the REST API, but are not able to be instantiated or > used. To make the ServiceLoader and Reflections discovery mechanisms behave > more similar, this assertion should be applied to all plugins, and a log > message emitted when plugins do not follow the constructor requirements. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery
C0urante merged PR #13467: URL: https://github.com/apache/kafka/pull/13467 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15019) Improve handling of broker heartbeat timeouts
[ https://issues.apache.org/jira/browse/KAFKA-15019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15019. Fix Version/s: 3.5.0 Resolution: Fixed > Improve handling of broker heartbeat timeouts > - > > Key: KAFKA-15019 > URL: https://issues.apache.org/jira/browse/KAFKA-15019 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.5.0 > > > Improve handling of overload situations in the KRaft controller -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15004) Topic config changes are not synced during zk to kraft migration (dual-write)
[ https://issues.apache.org/jira/browse/KAFKA-15004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-15004: -- Fix Version/s: 3.5.0 Assignee: Akhilesh Chaganti Resolution: Fixed > Topic config changes are not synced during zk to kraft migration (dual-write) > - > > Key: KAFKA-15004 > URL: https://issues.apache.org/jira/browse/KAFKA-15004 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15019) Improve handling of broker heartbeat timeouts
[ https://issues.apache.org/jira/browse/KAFKA-15019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728783#comment-17728783 ] Mickael Maison commented on KAFKA-15019: Since the PR is merged I'm marking this as resolved in 3.5.0 If there is follow up work left to do, feel free to reopen (and adjust Fix Version/s) or create a new ticket. > Improve handling of broker heartbeat timeouts > - > > Key: KAFKA-15019 > URL: https://issues.apache.org/jira/browse/KAFKA-15019 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > Improve handling of overload situations in the KRaft controller -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15004) Topic config changes are not synced during zk to kraft migration (dual-write)
[ https://issues.apache.org/jira/browse/KAFKA-15004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728782#comment-17728782 ] Mickael Maison commented on KAFKA-15004: Since the PR is merged I'm marking this as resolved in 3.5.0 If there is follow up work left to do, feel free to reopen (and adjust Fix Version/s) or create a new ticket. > Topic config changes are not synced during zk to kraft migration (dual-write) > - > > Key: KAFKA-15004 > URL: https://issues.apache.org/jira/browse/KAFKA-15004 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15003) TopicIdReplicaAssignment is not updated in migration (dual-write) when partitions are changed for topic
[ https://issues.apache.org/jira/browse/KAFKA-15003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728780#comment-17728780 ] Mickael Maison commented on KAFKA-15003: Since the PR is merged I'm marking this as resolved in 3.5.0 If there is follow up work left to do, feel free to reopen (and adjust Fix Version/s) or create a new ticket. > TopicIdReplicaAssignment is not updated in migration (dual-write) when > partitions are changed for topic > --- > > Key: KAFKA-15003 > URL: https://issues.apache.org/jira/browse/KAFKA-15003 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14996) The KRaft controller should properly handle overly large user operations
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-14996. Fix Version/s: 3.5.0 Assignee: Colin McCabe (was: Edoardo Comar) Resolution: Fixed > The KRaft controller should properly handle overly large user operations > > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.5.0 >Reporter: Edoardo Comar >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.5.0 > > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) The KRaft controller should properly handle overly large user operations
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728779#comment-17728779 ] Mickael Maison commented on KAFKA-14996: Since the PR is merged I'm marking this as resolved in 3.5.0 If there is follow up work left to do, feel free to reopen (and adjust Fix Version/s) or create a new ticket. > The KRaft controller should properly handle overly large user operations > > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.5.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Blocker > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15010) KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new controller while in dual write mode.
[ https://issues.apache.org/jira/browse/KAFKA-15010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728773#comment-17728773 ] Mickael Maison commented on KAFKA-15010: Since the PR is merged I'm marking this as resolved. If there is follow up work left to do, feel free to reopen (and adjust Fix Version/s) or create a new ticket. > KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new > controller while in dual write mode. > - > > Key: KAFKA-15010 > URL: https://issues.apache.org/jira/browse/KAFKA-15010 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: David Arthur >Priority: Blocker > Fix For: 3.5.0 > > > When a KRaft controller fails over, the existing migration driver (in dual > write mode) can fail in between Zookeeper writes and may leave Zookeeper with > incomplete and inconsistent data. So when a new controller becomes active > (and by extension new migration driver becomes active), this first thing we > should do is load the in-memory snapshot and use it to write metadata to > Zookeeper to have a steady state. We currently do not do this and it may > leave Zookeeper in inconsistent state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15010) KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new controller while in dual write mode.
[ https://issues.apache.org/jira/browse/KAFKA-15010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15010. Resolution: Fixed > KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new > controller while in dual write mode. > - > > Key: KAFKA-15010 > URL: https://issues.apache.org/jira/browse/KAFKA-15010 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: David Arthur >Priority: Blocker > Fix For: 3.5.0 > > > When a KRaft controller fails over, the existing migration driver (in dual > write mode) can fail in between Zookeeper writes and may leave Zookeeper with > incomplete and inconsistent data. So when a new controller becomes active > (and by extension new migration driver becomes active), this first thing we > should do is load the in-memory snapshot and use it to write metadata to > Zookeeper to have a steady state. We currently do not do this and it may > leave Zookeeper in inconsistent state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
C0urante commented on PR #13771: URL: https://github.com/apache/kafka/pull/13771#issuecomment-1573852526 Thanks Greg! I think the tradeoffs here are acceptable. Can we add some testing coverage for `PluginUtils::computeAliases`? Once that's done this should be good to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
C0urante commented on code in PR #13771: URL: https://github.com/apache/kafka/pull/13771#discussion_r1214470057 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -356,25 +357,16 @@ public static String prunedName(PluginDesc plugin) { * Verify whether a given plugin's alias matches another alias in a collection of plugins. * * @param alias the plugin descriptor to test for alias matching. - * @param plugins the collection of plugins to test against. + * @param aliases the collection of plugins to test against. * @param the plugin type. * @return false if a match was found in the collection, otherwise true. */ public static boolean isAliasUnique( PluginDesc alias, -Collection> plugins +Map aliases ) { -boolean matched = false; -for (PluginDesc plugin : plugins) { -if (simpleName(alias).equals(simpleName(plugin)) -|| prunedName(alias).equals(prunedName(plugin))) { -if (matched) { -return false; -} -matched = true; -} -} -return true; +// TODO: Mark alias collision and disable ambiguous aliases completely. Review Comment: > I don't think that anyone should be depending on these cross-plugin ambiguous aliases, because if they were, they would be subject to spurious ClassNotFoundExceptions whenever the non-deterministic iteration order caused a plugin of the wrong type to be loaded. Plugin scanning and alias computation is currently deterministic (the various `Set>` objects that we use are all instances of the `TreeSet` type), so users are not subject to this risk while running a fixed version of Kafka Connect with a fixed plugin path (and fixed contents of that plugin path). That said, I think the changes you've made are sufficient. > An alternative is to separate these aliases into per-plugin-type namespaces, but I don't think that is necessary at this time. Yeah, this would be nice to have but it's not a blocker. One case where it could come in handy is with plugins that implement multiple plugin interfaces (which is actually fairly common with `Converter` and `HeaderConverter` instances); you may have multiple `JsonConverter` plugins on your worker, but if only one of them implements the `Converter` interface and another implements the `Converter` and `HeaderConverter` interface, it's hard to argue that you shouldn't be able to use an alias when using the latter as a header converter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573784468 @vamossagar12 Thanks for the PR. I'd like to wait until Yash has had a chance to review before taking a look, so I've removed myself as a reviewer. Feel free to add me back once he (or someone else familiar with Connect) has had a chance to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
jsancio commented on PR #13765: URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573750489 > Actually, one thing I was wondering about this change since I am not that familiar with what the metadata version gates - does the change in the PR allow leader epochs to go backwards? The important observation is that this PR doesn't change the semantic of replaying `PartitionRecord` and `PartitionChangeRecord` with with respect to leader epoch bump. When replaying `PartitionChangeRecord` the state machines (controller and broker) will increase the leader epoch if the field `Leader` is set. This holds true before and after this PR. What this PR changes is when the controller sets the `Leader` field in `PartitionChangeRecord`. I should also point out that the MV check is not require for correctness. It is there for performance so that the PRODUCER request doesn't timeout and so that the Kafka producer doesn't have to retry the PRODUCE message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final boolean autoCommitEnabled; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; +// package private for testing +final AtomicInteger inFlightAsyncCommits; private final AtomicInteger pendingAsyncCommits; Review Comment: @dajac I took the description of pendingAyncCommits from your comment https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did that correctly. Changed in: https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final boolean autoCommitEnabled; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; +// package private for testing +final AtomicInteger inFlightAsyncCommits; private final AtomicInteger pendingAsyncCommits; Review Comment: I took the description of pendingAyncCommits from your comment https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did that correctly. Changed in: https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14497) LastStableOffset is advanced prematurely when a log is reopened.
[ https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728723#comment-17728723 ] Daniel Urban commented on KAFKA-14497: -- AFAIU, the information about the replicated state of a transaction is not stored in the snapshot at all. I think the data stored in the snapshot file needs to be extended with the extra information whether the completed transaction is replicated. By the time ProducerStateManager#completeTxn is called (which puts the transaction into ProducerStateManager.unreplicatedTxns), the producer entry is already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset is empty, indicating that there is no pending transaction). If a snapshot is created at this point, and then the snapshot is loaded, there is no way to differentiate between replicated and unreplicated transactions. Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing that while the transaction is complete, it might still be unreplicated. Then, when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in the producer entry can be cleared. This way the snapshot would contain the full data, and we could also recover the state of unreplicatedTxns. [~hachikuji] wdyt about this approach? If it seems okay, I can take a look into this and submit a PR. > LastStableOffset is advanced prematurely when a log is reopened. > > > Key: KAFKA-14497 > URL: https://issues.apache.org/jira/browse/KAFKA-14497 > Project: Kafka > Issue Type: Bug >Reporter: Vincent Jiang >Priority: Major > > In below test case, last stable offset of log is advanced prematurely after > reopen: > # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3] > # producer #2 appends transactional records to leader. offsets = [4, 5, 6, > 7] > # all records are replicated to followers and high watermark advanced to 8. > # at this point, lastStableOffset = 0. (first offset of an open transaction) > # producer #1 aborts the transaction by writing an abort marker at offset 8. > ProducerStateManager.unreplicatedTxns contains the aborted transaction > (firstOffset=0, lastOffset=8) > # then the log is closed and reopened. > # after reopen, log.lastStableOffset is initialized to 4. This is because > ProducerStateManager.unreplicatedTxns is empty after reopening log. > > We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log, > so that lastStableOffset remains unchanged before and after reopen. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573587479 > @vamossagar12 looks like there are checkstyle failures Oh didn’t check that before taggging. Will check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1214258128 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,30 +280,47 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); + return primaryStore.set(values, (primaryWriteError, ignored) -> { if (secondaryStore != null) { if (primaryWriteError != null) { log.trace("Skipping offsets write to secondary store because primary write has failed", primaryWriteError); +try (LoggingContext context = loggingContext()) { +callback.onCompletion(primaryWriteError, ignored); +} } else { try { // Invoke OffsetBackingStore::set but ignore the resulting future; we don't block on writes to this -// backing store. +// backing store. The only exception to this is when a batch consisting of tombstone records fails +// to be written to secondary store and has been successfully written to the primary store. In this case +// an error would be propagated back as in such cases, a deleted source partition +// would be reported as present because the 2 stores are not in sync. secondaryStore.set(values, (secondaryWriteError, ignored2) -> { try (LoggingContext context = loggingContext()) { -if (secondaryWriteError != null) { +if (secondaryWriteError != null && containsTombstones) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +callback.onCompletion(secondaryWriteError, ignored); +return; +} else if (secondaryWriteError != null) { log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); } else { log.debug("Successfully flushed offsets to secondary backing store"); } +//primaryWriteError is null at this point, and we don't care about secondaryWriteError +callback.onCompletion(null, ignored); Review Comment: Yeah. Good catch. That doesn’t need to happen in this case. Will remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1214236252 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,30 +280,47 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); + return primaryStore.set(values, (primaryWriteError, ignored) -> { if (secondaryStore != null) { if (primaryWriteError != null) { log.trace("Skipping offsets write to secondary store because primary write has failed", primaryWriteError); +try (LoggingContext context = loggingContext()) { +callback.onCompletion(primaryWriteError, ignored); +} } else { try { // Invoke OffsetBackingStore::set but ignore the resulting future; we don't block on writes to this -// backing store. +// backing store. The only exception to this is when a batch consisting of tombstone records fails +// to be written to secondary store and has been successfully written to the primary store. In this case +// an error would be propagated back as in such cases, a deleted source partition +// would be reported as present because the 2 stores are not in sync. secondaryStore.set(values, (secondaryWriteError, ignored2) -> { try (LoggingContext context = loggingContext()) { -if (secondaryWriteError != null) { +if (secondaryWriteError != null && containsTombstones) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +callback.onCompletion(secondaryWriteError, ignored); +return; +} else if (secondaryWriteError != null) { log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); } else { log.debug("Successfully flushed offsets to secondary backing store"); } +//primaryWriteError is null at this point, and we don't care about secondaryWriteError +callback.onCompletion(null, ignored); Review Comment: Isn't this going to cause us to always block on writes to the secondary store which is something we want to avoid? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573553370 @vamossagar12 looks like there are checkstyle failures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
yashmayya commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227660 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java: ## @@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception { assertPollMetrics(0); } +@Test +public void testRetriableExceptionInPoll() throws Exception { + +final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class); +final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics); Review Comment: That's probably due to https://github.com/apache/kafka/pull/13726#discussion_r1214152577 (i.e. poll being called repeatedly in the task execution loop)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
yashmayya commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227126 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: `total-record-failures` is defined as the number of "record processing failures" in a task - https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L366-L367. I don't think a failed call to `SourceTask::poll` counts as a record processing failure? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573519452 I will review the tests once they run. Still tagging the 2 reviewers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 opened a new pull request, #13801: URL: https://github.com/apache/kafka/pull/13801 Description from the ticket: When exactly-once support is enabled for source connectors, source offsets can potentially be written to two different offsets topics: a topic specific to the connector, and the global offsets topic (which was used for all connectors prior to KIP-618 / version 3.3.0). Precedence is given to offsets in the per-connector offsets topic, but if none are found for a given partition, then the global offsets topic is used as a fallback. When committing offsets, a transaction is used to ensure that source records and source offsets are written to the Kafka cluster targeted by the source connector. This transaction only includes the connector-specific offsets topic. Writes to the global offsets topic take place after writes to the connector-specific offsets topic have completed successfully, and if they fail, a warning message is logged, but no other action is taken. Normally, this ensures that, for offsets committed by exactly-once-supported source connectors, the per-connector offsets topic is at least as up-to-date as the global offsets topic, and sometimes even ahead. However, for tombstone offsets, we lose that guarantee. If a tombstone offset is successfully written to the per-connector offsets topic, but cannot be written to the global offsets topic, then the global offsets topic will still contain that source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. This PR explicitly fails the offset flush for cases when the dual write fails on secondary store. Note that one of the proposed approaches on the JIRA ticket was to explicitly write tombstone records to secondary stores first before attempting to write to primary store. This PR doesn't choose to do so for 2 reasons => 1. Tombstone events happening are rare events as per ticket. And the probability of them failing on secondary and succeeding on primary is even lesser than that. 2. There is no guarantee that when we explicitly write tombstone records to secondary store, the write would succeed. So, we might still end up in the same situation as above. Considering these things, this PR errs on the side of correctness for these rare events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
vamossagar12 commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214203205 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java: ## @@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception { assertPollMetrics(0); } +@Test +public void testRetriableExceptionInPoll() throws Exception { + +final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class); +final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics); Review Comment: I am using the existing mechanisms to run the test using CountDownLatch. I could get the enough number of invocations but will think about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
vamossagar12 commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214199833 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Yeah but I didn't think I really needed to handle that case and the existing methods should be taking care of it going by the example above with converters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
vamossagar12 commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214198459 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: That's a good point. I assumed it should be taken care because even the transformations also don't make any such checks [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L482) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
vamossagar12 commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214197359 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: I think only `total-records-skipped` is a problem which is defined as `Total number of records skipped by this task.`. `total-record-failures` is actually defined as the total number of failures seen and not the total number of records which have failed. I would assume it should be easy to update the number of records in the poll(), why do you say we need to make a number of updates? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
vamossagar12 commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214192966 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Oh.. I just went by what the KIP says in the Proposed Approach section. I would assume this change would need an amendment of the KIP . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] oliveigah commented on a diff in pull request #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request
oliveigah commented on code in PR #13680: URL: https://github.com/apache/kafka/pull/13680#discussion_r1214190810 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -61,7 +61,7 @@ "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, -{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ +{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: Typo fixed! Thanks for the explanation @dajac. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
dajac commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1214154053 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final boolean autoCommitEnabled; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; +// package private for testing +final AtomicInteger inFlightAsyncCommits; private final AtomicInteger pendingAsyncCommits; Review Comment: Could we add a comment for these two variables? I think that we were all confused by them while looking at this PR so adding clarifying comments would make sense here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
yashmayya commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214147751 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java: ## @@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception { assertPollMetrics(0); } +@Test +public void testRetriableExceptionInPoll() throws Exception { + +final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class); +final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics); Review Comment: Shouldn't we use `MockTime` here so that we can advance the time programatically on each call to poll and ensure the retries occur as per the deadline? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Looks like this currently only handles `org.apache.kafka.connect.errors.RetriableException`, should we also add `org.apache.kafka.common.errors.RetriableException`? Looks like there's some history here - https://github.com/apache/kafka/pull/6675 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: It looks like the `RetryWithToleranceOperator` currently expects to only be used for per-record kind of operations (conversion, transformation etc.) - [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217), [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233) and [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L183) for instance. I think we'll need to make a number of updates there before it can be used in operations such as `SourceTask::poll` and `SinkTask::put`. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Should we be retrying on `RetriableException`s even if `error.tolerance` is set to `none`? Looks like that's what is happening here? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +
[GitHub] [kafka] dajac commented on a diff in pull request #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request
dajac commented on code in PR #13680: URL: https://github.com/apache/kafka/pull/13680#discussion_r1214151118 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -61,7 +61,7 @@ "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, -{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ +{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: While we are here, there is a small type. A space is missing here `"taggedVersions": "15+"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
dajac commented on code in PR #13794: URL: https://github.com/apache/kafka/pull/13794#discussion_r1214139956 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The Replayable interface. + */ +public interface Replayable { Review Comment: I have renamed it to `CoordinatorPlayback`. Does it sound good to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
dajac commented on code in PR #13794: URL: https://github.com/apache/kafka/pull/13794#discussion_r1214139496 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The Replayable interface. + */ Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15049) Flaky test DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate
[ https://issues.apache.org/jira/browse/KAFKA-15049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728637#comment-17728637 ] Josep Prat commented on KAFKA-15049: I've seen this error locally after verifying 3.4.1-rc3 (not on previous RCs) but only after a test run with some other flaky test failures. I decided rebooting the machine and tests passed for me. So I assumed it was a failed test that left some dangling process blocking the desired port for this test. > Flaky test DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate > > > Key: KAFKA-15049 > URL: https://issues.apache.org/jira/browse/KAFKA-15049 > Project: Kafka > Issue Type: Bug >Reporter: Tom Bentley >Priority: Major > > While testing 3.4.1RC3 > DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate failed > repeatedly on my machine always with the following stacktrace. > {{org.opentest4j.AssertionFailedError: Unexpected exception type thrown, > expected: but was: > > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at app//org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67) > at app//org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at app//org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3083) > at > app//kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:1066)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15050) Prompts in the quickstarts
Tom Bentley created KAFKA-15050: --- Summary: Prompts in the quickstarts Key: KAFKA-15050 URL: https://issues.apache.org/jira/browse/KAFKA-15050 Project: Kafka Issue Type: Improvement Components: documentation Reporter: Tom Bentley In the quickstarts [Steps 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate the command prompt. When we start to use Kafka Connect in [Step 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch to {{{}>{}}}. The [Kafka Streams quickstart|https://kafka.apache.org/documentation/streams/quickstart] also uses {{{}>{}}}. I don't think there's a reason for this, but if there is one (root vs user account?) it should be explained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214114184 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. Review Comment: I have refactored the implementation to use `appendRecords`. I think that it is better like this. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214113530 ## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ReplicaManager, RequestLocal} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( +tp: TopicPartition, +offset: Long + ): Unit = { +listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { +case other: ListenerAdaptor => listener.equals(other.listener) +case _ => false + } + + override def hashCode(): Int = { +listener.hashCode() + } + + override def toString: String = { +s"ListenerAdaptor(listener=$listener)" + } +} + +class PartitionWriterImpl[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + override def registerListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + override def deregisterListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + override def append( +tp: TopicPartition, +records: util.List[T] + ): Long = { +if (records.isEmpty) { + throw new IllegalStateException("records must be non-empty.") +} + +replicaManager.getLogConfig(tp) match { + case Some(logConfig) => +val magic = logConfig.recordVersion.value +val maxBatchSize = logConfig.maxMessageSize +val currentTimeMs = time.milliseconds() + +val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize +) + +records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { +recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS +) + } else { +throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } +} + +val appendResults = replicaManager.appendToLocalLog( Review Comment: Let me rename the class to `CoordinatorPartitionWriter` to make it explicit that this is used for/by coordinators. Does this work for you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214109717 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; + +/** + * Serializer which serializes {{@link Record}} to bytes. + */ +public class RecordSerializer implements PartitionWriter.Serializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( Review Comment: Yeah. I made it generic in the case we would want to reuse it for another coordinator in the future. This one is tight to the Record class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
urbandan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1573381258 @philipnee @ijuma @hachikuji @jolshan Could you please review this fix? Based on the git history, you made changes lately in the related code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org