[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss
[ https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862925#comment-17862925 ] Luke Chen commented on KAFKA-17062: --- Good quesiton that should we create new [RemoteLogSegmentId|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L850] when failed segment retried? [~satishd] [~ckamal] , we'd like to hear your thought about it? > RemoteLogManager - RemoteStorageException causes data loss > -- > > Key: KAFKA-17062 > URL: https://issues.apache.org/jira/browse/KAFKA-17062 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.8.0, 3.7.1, 3.9.0 >Reporter: Guillaume Mallet >Assignee: Guillaume Mallet >Priority: Major > Labels: tiered-storage > > When Tiered Storage is configured, retention.bytes defines the limit for the > amount of data stored in the filesystem and in remote storage. However a > failure while offloading to remote storage can cause segments to be dropped > before the retention limit is met. > What happens > Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a > {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would > expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment > locally (the local segment) and possibly more if the remote storage is > offline. i.e. segments in the following RemoteLogSegmentStates in the > RemoteLogMetadataManager (RLMM) : > * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > Let's assume the RLMM starts failing when segment 4 rolls. At the first > iteration of an RLMTask we will have - > * > [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773] > : is called first > ** RLMM becomes aware of Segment 4 and adds it to the metadata: > *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}), > *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > ** An exception is raised during the copy operation > ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93] > in RemoteStorageManager) which is caught with the error message “{{Error > occurred while copying log segments of partition}}” and no further copy will > be attempted for the duration of this RLMTask. > ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} > but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being > cleaned up when the associated segment is deleted. > * > [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122] > is then called > ** Retention size is computed in > [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296] > as the sum of all the segments size regardless of their state so computed > size of the topic is 1 (local) + 4 (remote) > ** Segment 1 as being the oldest will be dropped. > At the second iteration after > [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395] > (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 > in a {{COPY_SEGMENT_STARTED}} state each with a different > {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to > Segment 3 after another iteration. > At that point, we now have the RLMM composed of 4 copies of Segment 4 in > {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing > the LSO at the same time and causing the UnifiedLog to delete the local and > remote data for Segment 4 including its metadata. > Under those circumstances Kafka can quickly delete segments that were not > meant for deletion causing a data loss. > Steps to reproduce the problem: > 1. Enable tiered storage > {code:bash} > mkdir -p /tmp/tieredStorage/kafka-tiered-storage/ > cat <> config/kraft/server.properties > remote.log.storage.system.enable=True > remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage >
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2208106073 Hi @showuon I have adjust the description based on your feedback, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on code in PR #16491: URL: https://github.com/apache/kafka/pull/16491#discussion_r1665098473 ## docs/ops.html: ## @@ -3946,6 +3946,7 @@ Enter Migration Mode on the Brokers +broker.id: If broker.id.generation.enable is enabled (default is enabled) and broker.id is not set during migration, it will fail. Ensure broker.id is set to a non-negative integer. Review Comment: Yes, I think your version is more concise, making it easier to read. I will adopt it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17069) Remote copy throttle metrics
[ https://issues.apache.org/jira/browse/KAFKA-17069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-17069. --- Fix Version/s: 3.9.0 Resolution: Fixed > Remote copy throttle metrics > - > > Key: KAFKA-17069 > URL: https://issues.apache.org/jira/browse/KAFKA-17069 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhijeet Kumar >Priority: Major > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17069: Remote copy throttle metrics [kafka]
showuon merged PR #16086: URL: https://github.com/apache/kafka/pull/16086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17069: Remote copy throttle metrics [kafka]
showuon commented on PR #16086: URL: https://github.com/apache/kafka/pull/16086#issuecomment-2208096103 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
showuon commented on code in PR #16491: URL: https://github.com/apache/kafka/pull/16491#discussion_r1665094501 ## docs/ops.html: ## @@ -3946,6 +3946,7 @@ Enter Migration Mode on the Brokers +broker.id: If broker.id.generation.enable is enabled (default is enabled) and broker.id is not set during migration, it will fail. Ensure broker.id is set to a non-negative integer. Review Comment: Looks better! Maybe we can just say: `Ensure broker.id is set to a non-negative integer even if broker.id.generation.enable is enabled (default is enabled). ` because: 1. we don't need to mention `during migration` since this is a migration section 2. We can combine these 2 sentences. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
brenden20 commented on code in PR #16373: URL: https://github.com/apache/kafka/pull/16373#discussion_r1665083663 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -485,6 +485,18 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", heartbeatTimer=" + heartbeatTimer + Review Comment: I made this change now, just want to make sure I did it correctly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1665069309 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: Cool. Related although not introduced by this PR, this makes me notice that we don't have this full story covered in the tests (not testing the call to `subscriptionState.unsubscribed` when `isNotInGroup` is false). Would you mind adding the same verification you added for `notInGroup` true, to the func that covers this for `notInGroup` false? Should be the same `verify(subscriptionState).unsubscribe();` but added to `assertStaleMemberLeavesGroupAndClearsAssignment` I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled [kafka]
vamossagar12 commented on PR #14158: URL: https://github.com/apache/kafka/pull/14158#issuecomment-2208006307 hey @C0urante , we recently hit another issue due to this. When you have some time, would it be possible for you to review this? It would be very helpful. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17061) KafkaController takes long time to connect to newly added broker after registration on large cluster
[ https://issues.apache.org/jira/browse/KAFKA-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada updated KAFKA-17061: - Description: h2. Environment * Kafka version: 3.3.2 * Cluster: 200~ brokers * Total num partitions: 40k * ZK-based cluster h2. Phenomenon When a broker left the cluster once due to the long STW and came back after a while, the controller took 6 seconds until connecting to the broker after znode registration, it caused significant message delivery delay. {code:java} [2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, deleted brokers: , bounced brokers: , all live brokers: 1,... (kafka.controller.KafkaController) [2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 1 trying to connect to broker 2 (kafka.controller.ControllerChannelManager) [2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting (kafka.controller.RequestSendThread) [2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback for 2 (kafka.controller.KafkaController) [2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 1 connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change requests (kafka.controller.RequestSendThread) {code} h2. Analysis >From the flamegraph at that time, we can see that >[liveBrokerIds|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/controller/ControllerContext.scala#L217] > calculation takes significant time. !image-2024-07-02-17-24-11-861.png|width=541,height=303! was: h2. Environment * Kafka version: 3.3.2 * Cluster: 200~ brokers * Total num partitions: 40k * ZK-based cluster h2. Phenomenon When a broker left the cluster once due to the long STW and came back after a while, the controller took 6 seconds until connecting to the broker after znode registration, it caused significant message delivery delay. {code:java} [2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, deleted brokers: , bounced brokers: , all live brokers: 1,... (kafka.controller.KafkaController) [2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 1 trying to connect to broker 2 (kafka.controller.ControllerChannelManager) [2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting (kafka.controller.RequestSendThread) [2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback for 2 (kafka.controller.KafkaController) [2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 1 connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change requests (kafka.controller.RequestSendThread) {code} h2. Analysis >From the flamegraph at that time, we can see that >[liveBrokerIds|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/controller/ControllerContext.scala#L217] > calculation takes significant time. !image-2024-07-02-17-24-11-861.png|width=541,height=303! Since no concurrent modification against liveBrokerEpochs is expected, we can just cache the result to improve the performance. > KafkaController takes long time to connect to newly added broker after > registration on large cluster > > > Key: KAFKA-17061 > URL: https://issues.apache.org/jira/browse/KAFKA-17061 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > Attachments: image-2024-07-02-17-22-06-100.png, > image-2024-07-02-17-24-11-861.png > > > h2. Environment > * Kafka version: 3.3.2 > * Cluster: 200~ brokers > * Total num partitions: 40k > * ZK-based cluster > h2. Phenomenon > When a broker left the cluster once due to the long STW and came back after a > while, the controller took 6 seconds until connecting to the broker after > znode registration, it caused significant message delivery delay. > {code:java} > [2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, > deleted brokers: , bounced brokers: , all live brokers: 1,... > (kafka.controller.KafkaController) > [2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller > 1 trying to connect to broker 2 (kafka.controller.ControllerChannelManager) > [2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting > (kafka.controller.RequestSendThread) > [2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback > for 2 (kafka.controller.KafkaController) > [2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller > 1 connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change > requests (kafka.controller.RequestSendThread) > {code} > h2. Analysis > From the flamegraph at that time,
[jira] [Commented] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9
[ https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862899#comment-17862899 ] PoAn Yang commented on KAFKA-17073: --- Hi [~chia7712], I'm interested in this. If you're not working on it, may I take it? Thank you. > Deprecate ReplicaVerificationTool in 3.9 > > > Key: KAFKA-17073 > URL: https://issues.apache.org/jira/browse/KAFKA-17073 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > Fix For: 3.9.0 > > > see discussion > https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py > In short, the tool is useless and so it is good time to deprecate it in 3.9. > That enables us to remove it from 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14094: Support for first leader bootstrapping the voter set [kafka]
jsancio commented on code in PR #16518: URL: https://github.com/apache/kafka/pull/16518#discussion_r1664928188 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -151,6 +151,7 @@ public final class KafkaRaftClient implements RaftClient { public static final int MAX_FETCH_WAIT_MS = 500; public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024; public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES; +public static final OffsetAndEpoch BOOTSTRAP_SNAPSHOT_ID = new OffsetAndEpoch(0, 0); Review Comment: Let's define this in `o.a.k.s.Snapshots` instead. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -202,7 +214,46 @@ public void appendLeaderChangeMessage(long currentTimeMs) { .setVoters(voters) .setGrantingVoters(grantingVoters); -accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); +accumulator.appendControlMessages((baseOffset, epoch, buffer) -> { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, +RecordBatch.CURRENT_MAGIC_VALUE, +Compression.NONE, +TimestampType.CREATE_TIME, +baseOffset, +currentTimeMs, +RecordBatch.NO_PRODUCER_ID, +RecordBatch.NO_PRODUCER_EPOCH, +RecordBatch.NO_SEQUENCE, +false, +true, +epoch, +buffer.capacity() +) +) { +builder.appendLeaderChangeMessage( +currentTimeMs, +leaderChangeMessage +); +VoterSetOffset voterSetOffset = lastVoterSetOffset.orElse(null); +// if lastVoterOffset is -1 we know the leader hasn't written the bootstrap snapshot records to the log yet +if (voterSetOffset != null && voterSetOffset.offset() == -1) { Review Comment: Avoid the use of `null`. ```java lastVoterSetOffset.ifPresent(voterSetOffset -> { // if lastVoterOffset is -1 we know the leader hasn't written the bootstrap snapshot records to the log yet if (voterSetOffset.offset() == -1) { ``` ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSetOffset.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +public class VoterSetOffset { +private final VoterSet voterSet; +private final Long offset; Review Comment: An object is not needed. Use `long` instead. It takes less space and doesn't require a dereference. ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSetOffset.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +public class VoterSetOffset { Review Comment: Why did you add this type instead of using `LogHistory.Entry`? ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -454,6 +455,7 @@ public void initialize( nodeId, nodeDirectoryId, partitionState::lastVoterSet, +partitionState::lastVoterSetOffset, Review Comment: It is unfortunate that we need to add a parameter
Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]
m1a2st commented on PR #15966: URL: https://github.com/apache/kafka/pull/15966#issuecomment-2207562526 @chia7712, Thanks for your comments, I will open new PR for this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17077) the node.id is inconsistent to broker.id when "broker.id.generation.enable=true"
[ https://issues.apache.org/jira/browse/KAFKA-17077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862892#comment-17862892 ] TengYao Chi commented on KAFKA-17077: - Hi [~chia7712] If you are not start working on it, I would like to have it > the node.id is inconsistent to broker.id when > "broker.id.generation.enable=true" > > > Key: KAFKA-17077 > URL: https://issues.apache.org/jira/browse/KAFKA-17077 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Critical > > We change the broker id of `KafkaConfig` directly when > `broker.id.generation.enable=true` [0]. However, the update is NOT sync to > node.id of `KafkaConfig`. It results in following issues: > 1. we can see many "-1" in the log. for example: > {code:sh} > [2024-07-03 19:23:08,453] INFO [ExpirationReaper--1-AlterAcls]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > {code} > 2. `KafkaRaftManager` will use uninitialized node.id to create > `KafkaRaftClient` in migration [1], and the error sequentially happens > [0] > https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/server/KafkaServer.scala#L261 > [1] > https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/raft/RaftManager.scala#L230 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862891#comment-17862891 ] Greg Harris commented on KAFKA-10370: - I'm reopening this issue as we have a fresh bug report: [https://lists.apache.org/thread/93msfod4bltb7dw73trdyfh95zbldo58] this time with the Snowflake connector. While it may be possible to solve this in the task implementation, I think that we could also change the framework to make this error non-fatal, and just drop offsets for unassigned partitions. > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: connect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Priority: Major > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted >
[jira] [Reopened] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reopened KAFKA-10370: - Assignee: (was: Ning Zhang) > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: connect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Priority: Major > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use *consumer.assign* with *consumer.seek* , instead of > *consumer.subscribe*, to handle
[PR] KAFKA-17078: Add SecurityManagerCompatibility shim [kafka]
gharris1727 opened a new pull request, #16522: URL: https://github.com/apache/kafka/pull/16522 This is a shim to allow the use of both modern (JRE 18+) and legacy APIs (deprecated in 17, degraded in 23, removal unknown) related to [JEP 411: Deprecate the Security Manager for Removal](https://openjdk.org/jeps/411). This shim implements the interim strategy outlined in [KIP-1006: Remove SecurityManager Support](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support) where legacy APIs are used as long as they are available and functional. Once the legacy APIs are removed or degraded, the modern APIs are used instead. The shim is structured as a general interface with four 'strategy' implementations for testability. These implementations allow for mocking out the classloading infrastructure to simulate situations which no current JRE implements, namely removal and further degradation of functionality. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2207337445 @philipnee @lianetm thank you both for the feedback, I have addressed all comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Update the properties files for controller with advertised.listeners [kafka]
jsancio merged PR #16473: URL: https://github.com/apache/kafka/pull/16473 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR; Add property methods to LogOffsetMetadata [kafka]
jsancio opened a new pull request, #16521: URL: https://github.com/apache/kafka/pull/16521 This change simply adds property methods to LogOffsetMetadata. It changes all of the callers to use the new property methods instead of using the fields directly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio merged PR #16454: URL: https://github.com/apache/kafka/pull/16454 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio commented on PR #16454: URL: https://github.com/apache/kafka/pull/16454#issuecomment-2207255514 Unrelated failures. Mergning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17078) Add SecurityManager reflective shim
Greg Harris created KAFKA-17078: --- Summary: Add SecurityManager reflective shim Key: KAFKA-17078 URL: https://issues.apache.org/jira/browse/KAFKA-17078 Project: Kafka Issue Type: Task Components: clients, connect, Tiered-Storage Reporter: Greg Harris Assignee: Greg Harris Add a shim class to allow for detection and usage of legacy and modern methods before and after the SecurityManager removal. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664751869 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +167,30 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public static Map validate(Map configs) { +Map invalidConfigs = new HashMap<>(); + +// No point to validate when connector is disabled. +if ("false".equals(configs.getOrDefault(ENABLED, "true"))) { +return invalidConfigs; +} + +boolean emitCheckpointDisabled = "false".equals(configs.getOrDefault(EMIT_CHECKPOINTS_ENABLED, "true")); +boolean syncGroupOffsetsDisabled = "false".equals(configs.getOrDefault(SYNC_GROUP_OFFSETS_ENABLED, "true")); Review Comment: the default value of `SYNC_GROUP_OFFSETS_ENABLED` is `false`, so why we don't use the same default value here? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +167,30 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public static Map validate(Map configs) { +Map invalidConfigs = new HashMap<>(); + +// No point to validate when connector is disabled. +if ("false".equals(configs.getOrDefault(ENABLED, "true"))) { +return invalidConfigs; +} + +boolean emitCheckpointDisabled = "false".equals(configs.getOrDefault(EMIT_CHECKPOINTS_ENABLED, "true")); +boolean syncGroupOffsetsDisabled = "false".equals(configs.getOrDefault(SYNC_GROUP_OFFSETS_ENABLED, "true")); + +if (emitCheckpointDisabled && syncGroupOffsetsDisabled) { +Arrays.asList(SYNC_GROUP_OFFSETS_ENABLED, EMIT_CHECKPOINTS_ENABLED).forEach(configName -> { +invalidConfigs.putIfAbsent(configName, "MirrorCheckpointConnector can't run with both " + SYNC_GROUP_OFFSETS_ENABLED + " and " + Review Comment: It seems to me `EMIT_CHECKPOINTS_ENABLED` does not obstruct `MirrorCheckpointConnector` from running since it is used to update consumer groups offsets of target cluster. By contrast, `SYNC_GROUP_OFFSETS_ENABLED` do impact the `MirrorCheckpointConnector` https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664701302 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]
soarez commented on PR #16287: URL: https://github.com/apache/kafka/pull/16287#issuecomment-2207104921 Yes, @jlprat please share any issues you run into. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
chia7712 commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2207036363 > The inconsistency between node.id and broker.id (when generated id is enabled) can be addressed in another issue. I will file it later https://issues.apache.org/jira/browse/KAFKA-17077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17077) the node.id is inconsistent to broker.id when "broker.id.generation.enable=true"
Chia-Ping Tsai created KAFKA-17077: -- Summary: the node.id is inconsistent to broker.id when "broker.id.generation.enable=true" Key: KAFKA-17077 URL: https://issues.apache.org/jira/browse/KAFKA-17077 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We change the broker id of `KafkaConfig` directly when `broker.id.generation.enable=true` [0]. However, the update is NOT sync to node.id of `KafkaConfig`. It results in following issues: 1. we can see many "-1" in the log. for example: {code:sh} [2024-07-03 19:23:08,453] INFO [ExpirationReaper--1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) {code} 2. `KafkaRaftManager` will use uninitialized node.id to create `KafkaRaftClient` in migration [1], and the error sequentially happens [0] https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/server/KafkaServer.scala#L261 [1] https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/raft/RaftManager.scala#L230 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664632832 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -594,24 +740,13 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -DEFAULT_HEARTBEAT_INTERVAL_MS, -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0)); -backgroundEventHandler = mock(BackgroundEventHandler.class); - heartbeatRequestManager = createHeartbeatRequestManager( coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler); + Review Comment: can we remove the extra space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664631319 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -277,27 +332,29 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + -"previous one is in-flight"); +"previous one is in-flight"); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + -"interval expires if there is a previous HB request in-flight"); +"interval expires if there is a previous HB request in-flight"); // Receive response for the inflight after the interval expired. The next HB should be sent // on the next poll waiting only for the minimal backoff. inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); time.sleep(DEFAULT_RETRY_BACKOFF_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size(), "A next heartbeat should be sent on " + -"the first poll after receiving a response that took longer than the interval, " + -"waiting only for the minimal backoff."); +"the first poll after receiving a response that took longer than the interval, " + Review Comment: could you revert the changes here? (as well as the ones above and below) I think these are IDE indentation changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17075: Use new health check endpoint in Connect system tests to verify worker readiness [kafka]
C0urante opened a new pull request, #16520: URL: https://github.com/apache/kafka/pull/16520 [Jira](https://issues.apache.org/jira/browse/KAFKA-17075) Introduces a new startup mode, `STARTUP_MODE_HEALTH_CHECK`, and uses that as the new default value, for verifying worker readiness during system tests. This mode waits for a 200 response from the `GET /health` endpoint from the worker, and is resilient against temporary errors that may occur during startup (such as failure to start the REST server or initialize its resources yet). I have not been able to run the full gamut of Connect system tests locally, but I have verified that this works with the [test_dynamic_logging](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_distributed_test.py#L464) case in [connect_distributed_test.py](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_distributed_test.py) and all tests in [connect_test.py](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_test.py). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]
chia7712 commented on PR #15966: URL: https://github.com/apache/kafka/pull/15966#issuecomment-2206969288 ``` org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not complete execution for Gradle Test Executor 100. at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:64) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.stop(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:119) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:66) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at net.bytebuddy.description.type.TypeDescription$ForLoadedType.of(TypeDescription.java:8619) at net.bytebuddy.description.method.MethodDescription$ForLoadedMethod.getDeclaringType(MethodDescription.java:1190) at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.isOverridden(MockMethodAdvice.java:199) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.isUnavailable(ConsumerNetworkClient.java:560) at org.apache.kafka.clients.consumer.internals.Fetcher.isUnavailable(Fetcher.java:87) at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:427) at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:105) at org.apache.kafka.clients.consumer.internals.FetcherTest.sendFetches(FetcherTest.java:246) at org.apache.kafka.clients.consumer.internals.FetcherTest.testFetcherConcurrency(FetcherTest.java:2943) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) ``` the failed test is related to this PR. In the test case `testFetcherConcurrency`, it does not return correct `sessionTopicNames` so normally it should NOT see the correct response data. However, `FetchResponse#responseData` will return the cached data regardless of input, so it CAN get correct response data even though it pass empty `topicNames`. That is a good example of showing the potential bug :) @m1a2st Could you copy the changes of this PR to another one, and please fix `testFetcherConcurrency` according to my comment. Also, please add new test for the change. @johnnychhsu Sorry, I can't merge this PR as it causes the failed test. Please feel free to close this PR as @m1a2st will leverage this PR to complete it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17076) logEndOffset could be lost due to log cleaning
[ https://issues.apache.org/jira/browse/KAFKA-17076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862868#comment-17862868 ] Jun Rao commented on KAFKA-17076: - One potential solution is to adjust the log cleaning logic such that it always preserves the last batch during each round of cleaning. If all records in the last batch are removed, we can just retain the empty batch to preserve the last offset. The empty batch will then be replicated to all replicas to preserve the true logEndOffset. > logEndOffset could be lost due to log cleaning > -- > > Key: KAFKA-17076 > URL: https://issues.apache.org/jira/browse/KAFKA-17076 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jun Rao >Priority: Major > > It's possible for the log cleaner to remove all records in the suffix of the > log. If the partition is then reassigned, the new replica won't be able to > see the true logEndOffset since there is no record batch associated with it. > If this replica becomes the leader, it will assign an already used offset to > a newly produced record, which is incorrect. > > It's relatively rare to trigger this issue since the active segment is never > cleaned and typically is not empty. However, the following is one possibility. > # records with offset 100-110 are produced and fully replicated to all ISR. > All those records are delete records for certain keys. > # record with offset 111 is produced. It forces the roll of a new segment in > broker b1 and is added to the log. The record is not committed and is later > truncated from the log, leaving an empty active segment in this log. b1 at > some point becomes the leader. > # log cleaner kicks in and removes records 100-110. > # The partition is reassigned to another broker b2. b2 replicates all > records from b1 up to offset 100 and marks its logEndOffset at 100. Since > there is no record to replicate after offset 100 in b1, b2's logEndOffset > stays at 100 and b2 can join the ISR. > # b2 becomes the leader and assign offset 100 to a new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17076) logEndOffset could be lost due to log cleaning
Jun Rao created KAFKA-17076: --- Summary: logEndOffset could be lost due to log cleaning Key: KAFKA-17076 URL: https://issues.apache.org/jira/browse/KAFKA-17076 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao It's possible for the log cleaner to remove all records in the suffix of the log. If the partition is then reassigned, the new replica won't be able to see the true logEndOffset since there is no record batch associated with it. If this replica becomes the leader, it will assign an already used offset to a newly produced record, which is incorrect. It's relatively rare to trigger this issue since the active segment is never cleaned and typically is not empty. However, the following is one possibility. # records with offset 100-110 are produced and fully replicated to all ISR. All those records are delete records for certain keys. # record with offset 111 is produced. It forces the roll of a new segment in broker b1 and is added to the log. The record is not committed and is later truncated from the log, leaving an empty active segment in this log. b1 at some point becomes the leader. # log cleaner kicks in and removes records 100-110. # The partition is reassigned to another broker b2. b2 replicates all records from b1 up to offset 100 and marks its logEndOffset at 100. Since there is no record to replicate after offset 100 in b1, b2's logEndOffset stays at 100 and b2 can join the ISR. # b2 becomes the leader and assign offset 100 to a new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17075) Use health check endpoint to verify Connect worker readiness in system tests
Chris Egerton created KAFKA-17075: - Summary: Use health check endpoint to verify Connect worker readiness in system tests Key: KAFKA-17075 URL: https://issues.apache.org/jira/browse/KAFKA-17075 Project: Kafka Issue Type: Improvement Components: connect Affects Versions: 3.9.0 Reporter: Chris Egerton Assignee: Chris Egerton We introduced a health check endpoint for Kafka Connect as part of work on KAFKA-10816. We should start to use that endpoint to verify worker readiness in our system tests, instead of scanning worker logs for specific messages or hitting other, less-reliable REST endpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe
[ https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-10816. --- Fix Version/s: 3.9.0 Resolution: Done > Connect REST API should have a resource that can be used as a readiness probe > - > > Key: KAFKA-10816 > URL: https://issues.apache.org/jira/browse/KAFKA-10816 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Randall Hauch >Assignee: Chris Egerton >Priority: Major > Fix For: 3.9.0 > > > There are a few ways to accurately detect whether a Connect worker is > *completely* ready to process all REST requests: > # Wait for {{Herder started}} in the Connect worker logs > # Use the REST API to issue a request that will be completed only after the > herder has started, such as {{GET /connectors/{name}/}} or {{GET > /connectors/{name}/status}}. > Other techniques can be used to detect other startup states, though none of > these will guarantee that the worker has indeed completely started up and can > process all REST requests: > * {{GET /}} can be used to know when the REST server has started, but this > may be before the worker has started completely and successfully. > * {{GET /connectors}} can be used to know when the REST server has started, > but this may be before the worker has started completely and successfully. > And, for the distributed Connect worker, this may actually return an older > list of connectors if the worker hasn't yet completely read through the > internal config topic. It's also possible that this request returns even if > the worker is having trouble reading from the internal config topic. > * {{GET /connector-plugins}} can be used to know when the REST server has > started, but this may be before the worker has started completely and > successfully. > The Connect REST API should have an endpoint that more obviously and more > simply can be used as a readiness probe. This could be a new resource (e.g., > {{GET /status}}), though this would only work on newer Connect runtimes, and > existing tooling, installations, and examples would have to be modified to > take advantage of this feature (if it exists). > Alternatively, we could make sure that the existing resources (e.g., {{GET > /}} or {{GET /connectors}}) wait for the herder to start completely; this > wouldn't require a KIP and it would not require clients use different > technique for newer and older Connect runtimes. (Whether or not we back port > this is another question altogether, since it's debatable whether the > behavior of the existing REST resources is truly a bug.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]
C0urante merged PR #16477: URL: https://github.com/apache/kafka/pull/16477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]
C0urante commented on PR #16477: URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206933077 Thanks Greg! I've realized the TODO in the code base was unnecessary and this should be safe to merge without waiting for more green CI runs (see the latest commit message for more details). Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]
jolshan commented on PR #16421: URL: https://github.com/apache/kafka/pull/16421#issuecomment-2206865002 https://github.com/apache/kafka/pull/16183 will wait for this PR in order to not break tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Await formation of consumer groups before verifying expected sink connector offsets in OffsetsApiIntegrationTest [kafka]
C0urante opened a new pull request, #16519: URL: https://github.com/apache/kafka/pull/16519 Draft; will flesh out description if this yields positive results. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
jeffkbkim commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1664527321 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,294 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new
Re: [PR] KAFKA-16228: Add remote log metadata flag to the dump log tool [kafka]
fvaleri commented on code in PR #16475: URL: https://github.com/apache/kafka/pull/16475#discussion_r1664522945 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -243,6 +244,32 @@ class DumpLogSegmentsTest { assertEquals(Map.empty, errors.shallowOffsetNotFound) } + @Test + def testDumpRemoteLogMetadataRecords(): Unit = { Review Comment: Hi @divijvaidya, thanks. Most of the suggested tests are implemented now. > 3. metadata contains multiple records (testing with 2 is fine), one batch I guess this is the original test. > do we compact metadata? If yes, can you add cases where segments is a compacted segment (has some offsets missing). No, the cleanup policy for this topic is hard coded to delete. See [here](https://github.com/apache/kafka/blob/3.7.1/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java#L491). > do we compress metadata? If test, can you add cases which validate correct deserialization for different compression types No, they are not compressed. See for example [here.](https://github.com/apache/kafka/blob/3.7.1/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L746-L750) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig [kafka]
chia7712 commented on PR #16458: URL: https://github.com/apache/kafka/pull/16458#issuecomment-220683 I double-check all configs and yes all of them are not dynamic. Maybe we don't need to be over-engineering for now. Hence, we can have a PR for following changes. 1. re-introduce attributes to `GroupCoordinatorConfig` @dajac [comment](https://github.com/apache/kafka/pull/16458#discussion_r1662130270) 2. move `GroupCoordinatorConfig` validation from `KafkaConfig` to `GroupCoordinatorConfig` ( @OmniaGM [comment](https://github.com/apache/kafka/pull/16458#discussion_r1662318237)) 3. do validation in construction of `GroupCoordinatorConfig`. I guess this is a bit different to @OmniaGM idea (#16506 add a method `validate` to `ShareGroupConfig` instead of validating configs in construction). I prefer to validate all configs in construction as `KafakConfig` do validation in construction too. 4. add docs to `GroupCoordinatorConfig` to explain why we add attributes @brandboat @dajac @OmniaGM PTAL, I hope this can be a guideline for all similar config class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
cmccabe commented on code in PR #16454: URL: https://github.com/apache/kafka/pull/16454#discussion_r1664491334 ## raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java: ## @@ -58,193 +60,229 @@ private CandidateState newCandidateState(VoterSet voters) { ); } -@Test -public void testSingleNodeQuorum() { -CandidateState state = newCandidateState(voterSetWithLocal(IntStream.empty())); +@ParameterizedTest +@ValueSource(booleans = { true, false }) Review Comment: I'm ok with either way. I'm just surprised checkstyle accepts it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup TestPlugins and normalize TestPlugin enum [kafka]
gharris1727 merged PR #1: URL: https://github.com/apache/kafka/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup TestPlugins and normalize TestPlugin enum [kafka]
gharris1727 commented on PR #1: URL: https://github.com/apache/kafka/pull/1#issuecomment-2206772302 Test failures appear unrelated, and the connect tests pass for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9
[ https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17073: --- Labels: need-kip (was: ) > Deprecate ReplicaVerificationTool in 3.9 > > > Key: KAFKA-17073 > URL: https://issues.apache.org/jira/browse/KAFKA-17073 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > Fix For: 3.9.0 > > > see discussion > https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py > In short, the tool is useless and so it is good time to deprecate it in 3.9. > That enables us to remove it from 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17074) Remove ReplicaVerificationTool
Chia-Ping Tsai created KAFKA-17074: -- Summary: Remove ReplicaVerificationTool Key: KAFKA-17074 URL: https://issues.apache.org/jira/browse/KAFKA-17074 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Fix For: 4.0.0 this is follow-up of KAFKA-17073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9
Chia-Ping Tsai created KAFKA-17073: -- Summary: Deprecate ReplicaVerificationTool in 3.9 Key: KAFKA-17073 URL: https://issues.apache.org/jira/browse/KAFKA-17073 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see discussion https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py In short, the tool is useless and so it is good time to deprecate it in 3.9. That enables us to remove it from 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9
[ https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17073: --- Fix Version/s: 3.9.0 > Deprecate ReplicaVerificationTool in 3.9 > > > Key: KAFKA-17073 > URL: https://issues.apache.org/jira/browse/KAFKA-17073 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Fix For: 3.9.0 > > > see discussion > https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py > In short, the tool is useless and so it is good time to deprecate it in 3.9. > That enables us to remove it from 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]
gharris1727 commented on code in PR #16477: URL: https://github.com/apache/kafka/pull/16477#discussion_r1664476993 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/WorkerStatus.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import org.apache.kafka.connect.util.Stage; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class WorkerStatus { + +private final String status; +private final String message; + +@JsonCreator +private WorkerStatus( +@JsonProperty("status") String status, +@JsonProperty("message") String message +) { +this.status = status; +this.message = message; +} + +public static WorkerStatus healthy() { +return new WorkerStatus( +"healthy", +"Worker has completed startup and is ready to handle requests." +); +} + +public static WorkerStatus starting(Stage stage) { Review Comment: Ah yeah this is very reasonable. For some reason I thought there were a lot of call-sites that would need an extra null guard, but that wasn't the case. This is great. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14094: Support for first leader bootstrapping the voter set (wip) [kafka]
ahuang98 opened a new pull request, #16518: URL: https://github.com/apache/kafka/pull/16518 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]
C0urante commented on PR #16477: URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206645942 Thanks for the review @gharris1727! This is ready for another pass when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
[ https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-14941: - Assignee: Gantigmaa Selenge > Document which configuration options are applicable only to processes with > broker role or controller role > - > > Key: KAFKA-14941 > URL: https://issues.apache.org/jira/browse/KAFKA-14941 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Assignee: Gantigmaa Selenge >Priority: Major > > When running in KRaft mode, some of the configuration options are applicable > only to nodes with the broker process role and some are applicable only to > the nodes with the controller process roles. It would be great if this > information was part of the documentation (e.g. in the [Broker > Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the > website), but if it was also part of the config classes so that it can be > used in situations when the configuration is dynamically configured to for > example filter the options applicable to different nodes. This would allow > having configuration files with only the actually used configuration options > and for example, help to reduce unnecessary restarts when rolling out new > configurations etc. > For some options, it seems clear and the Kafka node would refuse to start if > they are set - for example the configurations of the non-controler-listeners > in controller-only nodes. For others, it seems a bit less clear (Does > {{compression.type}} option apply to controller-only nodes? Or the > configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [Don't Merge] Test [kafka]
TaiJuWu commented on PR #16463: URL: https://github.com/apache/kafka/pull/16463#issuecomment-2206506979 Patch ``` Benchmark(aclCount) (authorizerType) (denyPercentage) (resourceCount) Mode Cnt ScoreError Units AuthorizerBenchmark.testAclsIterator 10 ACL20 5000 avgt 15 1.528 ± 0.014 ms/op AuthorizerBenchmark.testAclsIterator 10 ACL201 avgt 15 5.343 ± 0.207 ms/op AuthorizerBenchmark.testAclsIterator 10 KRAFT20 5000 avgt 15 2.457 ± 0.015 ms/op AuthorizerBenchmark.testAclsIterator 10 KRAFT201 avgt 15 5.722 ± 0.084 ms/op AuthorizerBenchmark.testAclsIterator 20 ACL20 5000 avgt 15 5.514 ± 0.180 ms/op AuthorizerBenchmark.testAclsIterator 20 ACL201 avgt 15 20.672 ± 0.334 ms/op AuthorizerBenchmark.testAclsIterator 20 KRAFT20 5000 avgt 15 9.638 ± 7.254 ms/op AuthorizerBenchmark.testAclsIterator 20 KRAFT201 avgt 15 12.769 ± 0.207 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 10 ACL20 5000 avgt 15 0.007 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 10 ACL201 avgt 15 0.007 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 10 KRAFT20 5000 avgt 15 17.104 ± 0.267 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 10 KRAFT201 avgt 15 25.149 ± 0.412 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 20 ACL20 5000 avgt 15 0.007 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 20 ACL201 avgt 15 0.007 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 20 KRAFT20 5000 avgt 15 24.736 ± 0.407 ms/op AuthorizerBenchmark.testAuthorizeByResourceType 20 KRAFT201 avgt 15 46.468 ± 0.409 ms/op AuthorizerBenchmark.testAuthorizer 10 ACL20 5000 avgt 15 0.107 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizer 10 ACL201 avgt 15 0.130 ± 0.001 ms/op AuthorizerBenchmark.testAuthorizer 10 KRAFT20 5000 avgt 15 0.270 ± 0.002 ms/op AuthorizerBenchmark.testAuthorizer 10 KRAFT201 avgt 15 1.562 ± 1.101 ms/op AuthorizerBenchmark.testAuthorizer 20 ACL20 5000 avgt 15 0.481 ± 0.070 ms/op AuthorizerBenchmark.testAuthorizer 20 ACL201 avgt 15 0.724 ± 0.112 ms/op AuthorizerBenchmark.testAuthorizer 20 KRAFT20 5000 avgt 15 0.309 ± 0.004 ms/op AuthorizerBenchmark.testAuthorizer 20 KRAFT201 avgt 15 0.415 ± 0.004 ms/op AuthorizerBenchmark.testUpdateCache 10 ACL20 5000 avgt 15 15.881 ± 0.162 ms/op AuthorizerBenchmark.testUpdateCache 10 ACL201 avgt 15 35.924 ± 0.468 ms/op AuthorizerBenchmark.testUpdateCache 10 KRAFT20 5000 avgt 15 ≈ 10⁻⁶ ms/op AuthorizerBenchmark.testUpdateCache 10 KRAFT201 avgt 15 ≈ 10⁻⁶ ms/op AuthorizerBenchmark.testUpdateCache 20 ACL20 5000 avgt 15 31.230 ± 1.868 ms/op AuthorizerBenchmark.testUpdateCache 20 ACL201 avgt 15 60.179 ± 0.381 ms/op AuthorizerBenchmark.testUpdateCache 20 KRAFT20 5000 avgt 15 ≈ 10⁻⁶ ms/op
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
C0urante commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664381241 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: No worries! Latest looks good to me @chia7712 thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16953) Properly implement the sending of DescribeQuorumResponse
[ https://issues.apache.org/jira/browse/KAFKA-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16953: --- Description: The current implement doesn't accurately implement the different version of the response. I removed the buggy code in [https://github.com/apache/kafka/pull/16454] This needs to get reimplemented properly. was:The current implementation of QuorumDescribe doesn't populate the listeners in node collections. This needs to get fixed and tested. > Properly implement the sending of DescribeQuorumResponse > > > Key: KAFKA-16953 > URL: https://issues.apache.org/jira/browse/KAFKA-16953 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Priority: Major > Fix For: 3.9.0 > > > The current implement doesn't accurately implement the different version of > the response. I removed the buggy code in > [https://github.com/apache/kafka/pull/16454] > This needs to get reimplemented properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664368993 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: I updated `MirrorCheckpointConnector::validate` to create configValue if the name of the config not defined -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16953) Properly implement the sending of DescribeQuorumResponse
[ https://issues.apache.org/jira/browse/KAFKA-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16953: --- Summary: Properly implement the sending of DescribeQuorumResponse (was: Implement and test listeners in DescribeQuorumResponse) > Properly implement the sending of DescribeQuorumResponse > > > Key: KAFKA-16953 > URL: https://issues.apache.org/jira/browse/KAFKA-16953 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Priority: Major > Fix For: 3.9.0 > > > The current implementation of QuorumDescribe doesn't populate the listeners > in node collections. This needs to get fixed and tested. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664376985 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: Sorry for the confusion I just lost track of @mimaison point from before! this why I reverted config back to `MirrorConnectorConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) [kafka]
apoorvmittal10 commented on code in PR #16516: URL: https://github.com/apache/kafka/pull/16516#discussion_r1664366052 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java: ## @@ -209,14 +211,40 @@ public static CoordinatorRecord newGroupEpochRecord( public static CoordinatorRecord newGroupEpochTombstoneRecord( String groupId ) { -return new CoordinatorRecord( -new ApiMessageAndVersion( -new ConsumerGroupMetadataKey() -.setGroupId(groupId), -(short) 3 -), -null // Tombstone. -); +return newGroupEpochTombstoneRecord(groupId, GroupType.CONSUMER); +} + +/** + * Creates a ConsumerGroupMetadata tombstone. + * + * @param groupId The consumer group id. + * @param groupType The group type. + * @return The record. + */ +public static CoordinatorRecord newGroupEpochTombstoneRecord( +String groupId, +GroupType groupType +) { +if (groupType == GroupType.CONSUMER) { +return new CoordinatorRecord( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey() +.setGroupId(groupId), +(short) 3 +), +null // Tombstone. +); +} else if (groupType == GroupType.SHARE) { +return new CoordinatorRecord( +new ApiMessageAndVersion( +new ShareGroupMetadataKey() +.setGroupId(groupId), +(short) 3 Review Comment: My bad, corrected and added tests as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664366556 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +Metrics metrics = new Metrics(time); Review Comment: for aesthetic purpose, let's group all the `this` and non `this` i.e., can we move metrics to the bottom? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) [kafka]
AndrewJSchofield commented on code in PR #16516: URL: https://github.com/apache/kafka/pull/16516#discussion_r1664357349 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java: ## @@ -209,14 +211,40 @@ public static CoordinatorRecord newGroupEpochRecord( public static CoordinatorRecord newGroupEpochTombstoneRecord( String groupId ) { -return new CoordinatorRecord( -new ApiMessageAndVersion( -new ConsumerGroupMetadataKey() -.setGroupId(groupId), -(short) 3 -), -null // Tombstone. -); +return newGroupEpochTombstoneRecord(groupId, GroupType.CONSUMER); +} + +/** + * Creates a ConsumerGroupMetadata tombstone. + * + * @param groupId The consumer group id. + * @param groupType The group type. + * @return The record. + */ +public static CoordinatorRecord newGroupEpochTombstoneRecord( +String groupId, +GroupType groupType +) { +if (groupType == GroupType.CONSUMER) { +return new CoordinatorRecord( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey() +.setGroupId(groupId), +(short) 3 +), +null // Tombstone. +); +} else if (groupType == GroupType.SHARE) { +return new CoordinatorRecord( +new ApiMessageAndVersion( +new ShareGroupMetadataKey() +.setGroupId(groupId), +(short) 3 Review Comment: This should be 11. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move config getters and validation into server [kafka]
OmniaGM commented on PR #16307: URL: https://github.com/apache/kafka/pull/16307#issuecomment-2206434066 closing this until we agree on pattern for config classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move config getters and validation into server [kafka]
OmniaGM closed pull request #16307: KAFKA-15853: Move config getters and validation into server URL: https://github.com/apache/kafka/pull/16307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]
omkreddy merged PR #16513: URL: https://github.com/apache/kafka/pull/16513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]
soarez commented on PR #16287: URL: https://github.com/apache/kafka/pull/16287#issuecomment-2206328635 @mimaison I suspect some of the difficulty comes from the reason for this PR: the current script is difficult to reason about. If you think it would help with the review, I can look into adding unit tests for some of new modules here. The downside is that the diff will be even larger. I'm also not sure how we'd plug them into CI, but you could run them locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301020 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: Yeah, I agree we should not have test cases which are not real cases. I change `spy` back to `mock` and only test `subscriptionState#unsubscribe` is called when `isNotInGroup` is true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
FrankYang0529 commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301811 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } +@Test +public void testUnsubscribeWithoutGroupId() { +consumer = newConsumerWithoutGroupId(); +completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: Yeah, removed unused mock. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
C0urante commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664298524 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: I still agree with @mimaison that we shouldn't have this property in the `MirrorConnectorConfig` class, since then it'll be included as a "common config" for all connectors in our generated docs [here](https://kafka.apache.org/37/documentation.html#mirrormakercommonconfigs) instead of in the `MirrorSourceConnector`-specific docs [here](https://kafka.apache.org/37/documentation.html#mirrormakersourceconfigs). Can we just add a new `ConfigValue` for the property in `MirrorCheckpointConnector::validate` if we find a validation error with it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]
apoorvmittal10 commented on PR #16513: URL: https://github.com/apache/kafka/pull/16513#issuecomment-2206307430 @AndrewJSchofield @omkreddy The build passed with unrelated tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
C0urante commented on code in PR #16451: URL: https://github.com/apache/kafka/pull/16451#discussion_r1664292432 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -363,7 +364,17 @@ public DistributedHerder(DistributedConfig config, @Override public void start() { -this.herderExecutor.submit(this); +Future future = this.herderExecutor.submit(this); +try { +future.get(6, TimeUnit.SECONDS); +} catch (TimeoutException timeoutException) { +log.error("herder work thread timeout:", timeoutException); +future.cancel(true); +} catch (InterruptedException interruptedException) { +Thread.currentThread().interrupt(); +} catch (ExecutionException executionException) { +log.error("herder work thread execution exception:", executionException); Review Comment: We don't want to change `start` to block. Can we save the future in a field in this class, then expose something like a `Future herderTask()` method that allows testing code to access that future and, if desired, block on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16991) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
[ https://issues.apache.org/jira/browse/KAFKA-16991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-16991. - Resolution: Fixed > Flaky Test > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState > --- > > Key: KAFKA-16991 > URL: https://issues.apache.org/jira/browse/KAFKA-16991 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.9.0 > > Attachments: > 5owo5xbyzjnao-org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest-shouldRestoreState()-1-output.txt > > > We see this test running into timeouts more frequently recently. > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Repartition topic > restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged > data after 6 ms. ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)•••at > > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)at > > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)at > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:220) > {code} > There was no ERROR or WARN log... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16991) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
[ https://issues.apache.org/jira/browse/KAFKA-16991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-16991: Fix Version/s: 3.9.0 > Flaky Test > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState > --- > > Key: KAFKA-16991 > URL: https://issues.apache.org/jira/browse/KAFKA-16991 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.9.0 > > Attachments: > 5owo5xbyzjnao-org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest-shouldRestoreState()-1-output.txt > > > We see this test running into timeouts more frequently recently. > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Repartition topic > restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged > data after 6 ms. ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)•••at > > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)at > > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)at > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:220) > {code} > There was no ERROR or WARN log... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15917) Flaky test - OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862808#comment-17862808 ] Chris Egerton commented on KAFKA-15917: --- Should be partially addressed by [https://github.com/apache/kafka/pull/15302,] but there will likely still be failures for this test and others in the {{OffsetsApiIntegrationTest}} suite that require follow-up changes. > Flaky test - > OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks > --- > > Key: KAFKA-15917 > URL: https://issues.apache.org/jira/browse/KAFKA-15917 > Project: Kafka > Issue Type: Bug >Reporter: Haruki Okada >Assignee: Chris Egerton >Priority: Major > Labels: flaky-test > Attachments: stdout.log > > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/] > > > {code:java} > Error > java.lang.AssertionError: > Expected: a string containing "zombie sink task" > but: was "Could not alter connector offsets. Error response: > {"error_code":500,"message":"Failed to alter consumer group offsets for > connector test-connector"}" > Stacktrace > java.lang.AssertionError: > Expected: a string containing "zombie sink task" > but: was "Could not alter connector offsets. Error response: > {"error_code":500,"message":"Failed to alter consumer group offsets for > connector test-connector"}" > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:431) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) >
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante commented on PR #15302: URL: https://github.com/apache/kafka/pull/15302#issuecomment-2206277714 Thanks Yash! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante merged PR #15302: URL: https://github.com/apache/kafka/pull/15302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
adixitconfluent commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664266986 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -4004,6 +4488,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], +) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + +val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() +shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicNames.asScala.contains(topic.topicId)) { +topic.partitions.forEach((partition: ShareFetchRequestData.FetchPartition) => { + val topicIdPartition = new TopicIdPartition( +topic.topicId, +new TopicPartition(null, partition.partitionIndex)) + erroneous += +topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) +}) + } + else { +topic.partitions().forEach ( partition => { + val topicIdPartition = new TopicIdPartition( +topic.topicId(), +new TopicPartition(topicNames.get(topic.topicId()), partition.partitionIndex()) + ) + var exceptionThrown = false + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + breakable{ +partition.acknowledgementBatches().forEach( batch => { + try { +acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() +)) + } catch { +case e : IllegalArgumentException => + exceptionThrown = true + erroneous += topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.forException(e)) + break + } +}) + } + if(!exceptionThrown && acknowledgeBatches.size() > 0) { +acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches + } +}) + } +}) +acknowledgeBatchesMap + } + + def validateAcknowledgementBatches( + acknowledgementDataFromRequest : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] +) : mutable.Set[TopicIdPartition] = { +val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = mutable.Set.empty[TopicIdPartition] +acknowledgementDataFromRequest.foreach{ case (tp : TopicIdPartition, acknowledgeBatches : util.List[ShareAcknowledgementBatch]) => + var prevEndOffset = -1L + breakable { +acknowledgeBatches.forEach(batch => { + if (batch.firstOffset > batch.lastOffset) { Review Comment: can we not club all of this "if" and the below "if" conditions into a single if? ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3955,11 +3960,490 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case
[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss
[ https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862802#comment-17862802 ] Guillaume Mallet commented on KAFKA-17062: -- [~showuon] thanks for the feedback ! > Could you help me understand why moving the failed segment to delete_started >state will help reducing this issue? Because it must be cleaned next time when >we enter >`[{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]`? That's what I had in mind but that would address only issues where writes are failing and not deletions (e.g. no more available space) > But the upload failed segments should not get deleted, right? This portion seems a bit unclear to me. Because each iteration of the RLMTask creates a unique [RemoteLogSegmentId|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L850] I assumed that multiple retries should result in multiple deletion as [deleteLogSegmentData|https://github.com/apache/kafka/blob/20e101c2e4cb2b34c2c575287cfaec76aa8c5db0/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L152] will be called once per unique RemoteLogSegmentId and that failed copies are safe to be deleted as soon as possible. If your understanding of the contract is that failed uploads shouldn't be deleted and multiple iteration of the RLMTask should reuse previous files then I probably misunderstood the role of the RemoteLogSegmentId because it sounds like we could reuse it in case of failure as we're asking for the [idempotency|https://github.com/apache/kafka/blob/35baa0ac4fcb7f21bb0df037d0756429db5d3bb2/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L85] of a copy. I was under the impression that each upload should be unique due to this RemoteLogSegmentId which is why I was steered in that direction. > This is indeed a problem. Could you help open a separate Jira for this issue? > And welcome to contribute to it. I think we don't have to do it > complicatedly. Just a simple counter should be enough given this is a rare > case and won't cause too much problem. WDYT? Good suggestion, I'll file one asap, that would probably be better for treating those tasks independently. I'm not sure I understand what you mean by a simple counter but maybe this discussion should happen in that new Jira. I'll ping you when I have it filed to make sure I understand what you mean. > RemoteLogManager - RemoteStorageException causes data loss > -- > > Key: KAFKA-17062 > URL: https://issues.apache.org/jira/browse/KAFKA-17062 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.8.0, 3.7.1, 3.9.0 >Reporter: Guillaume Mallet >Assignee: Guillaume Mallet >Priority: Major > Labels: tiered-storage > > When Tiered Storage is configured, retention.bytes defines the limit for the > amount of data stored in the filesystem and in remote storage. However a > failure while offloading to remote storage can cause segments to be dropped > before the retention limit is met. > What happens > Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a > {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would > expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment > locally (the local segment) and possibly more if the remote storage is > offline. i.e. segments in the following RemoteLogSegmentStates in the > RemoteLogMetadataManager (RLMM) : > * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > Let's assume the RLMM starts failing when segment 4 rolls. At the first > iteration of an RLMTask we will have - > * > [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773] > : is called first > ** RLMM becomes aware of Segment 4 and adds it to the metadata: > *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}), > *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > ** An exception is raised during the copy operation > ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93] > in RemoteStorageManager) which is caught with the error
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck commented on PR #16503: URL: https://github.com/apache/kafka/pull/16503#issuecomment-2206182951 merged #16503 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck merged PR #16503: URL: https://github.com/apache/kafka/pull/16503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]
bachmanity1 commented on code in PR #15475: URL: https://github.com/apache/kafka/pull/15475#discussion_r1664248591 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java: ## @@ -174,33 +176,39 @@ public void testParseAccessTokenInvalidJson() { } @Test -public void testFormatAuthorizationHeader() { -assertAuthorizationHeader("id", "secret"); +public void testFormatAuthorizationHeader() throws UnsupportedEncodingException { +assertAuthorizationHeader("id", "secret", false); } @Test -public void testFormatAuthorizationHeaderEncoding() { +public void testFormatAuthorizationHeaderEncoding() throws UnsupportedEncodingException { // See KAFKA-14496 -assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E"); +assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false); +// See KAFKA-16345 Review Comment: I updated both of the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]
bachmanity1 commented on code in PR #15475: URL: https://github.com/apache/kafka/pull/15475#discussion_r1664244790 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java: ## @@ -174,33 +176,39 @@ public void testParseAccessTokenInvalidJson() { } @Test -public void testFormatAuthorizationHeader() { -assertAuthorizationHeader("id", "secret"); +public void testFormatAuthorizationHeader() throws UnsupportedEncodingException { +assertAuthorizationHeader("id", "secret", false); } @Test -public void testFormatAuthorizationHeaderEncoding() { +public void testFormatAuthorizationHeaderEncoding() throws UnsupportedEncodingException { // See KAFKA-14496 -assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E"); +assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false); +// See KAFKA-16345 +assertAuthorizationHeader("user!@~'", "secret-(*)!", true); } -private void assertAuthorizationHeader(String clientId, String clientSecret) { +private void assertAuthorizationHeader(String clientId, String clientSecret, boolean urlencode) throws UnsupportedEncodingException { Review Comment: I think the intent here is to prevent unintentional modifications to the `HttpAccessTokenRetriever.formatAuthorizationHeader()` method that could change its resulting value. cc. @kirktrue ## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ## @@ -192,6 +192,10 @@ public class SaslConfigs { + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no" + " match, the broker will reject the JWT and authentication will fail."; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE = "sasl.oauthbearer.header.urlencode"; +public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE = false; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_DOC = "The (optional) setting to enable the OAuth client to URL-encode the client_id and client_secret in the authorization header" ++ " in accordance with RFC6749, see https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1 for more detail. The default value is set to 'false' for backward compatibility"; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck commented on PR #16503: URL: https://github.com/apache/kafka/pull/16503#issuecomment-2206142493 Failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records
[ https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dujian0068 reassigned KAFKA-17070: -- Assignee: dujian0068 > perf: consider to use ByteBufferOutputstream to append records > -- > > Key: KAFKA-17070 > URL: https://issues.apache.org/jira/browse/KAFKA-17070 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: dujian0068 >Priority: Major > > Consider to use ByteBufferOutputstream to append records, instead of a > DataOutputStream. We should add JMH test to confirm this indeed improve the > performance before merging it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
adixitconfluent commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664193628 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3955,11 +3960,490 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") +} + +val groupId = shareFetchRequest.data.groupId +val memberId = shareFetchRequest.data.memberId +val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + +var cachedTopicPartitions : util.List[TopicIdPartition] = null + +if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + try { +cachedTopicPartitions = sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId)) + } catch { +// Exception handling is needed when this value is being utilized on receiving FINAL_EPOCH. +case _: ShareSessionNotFoundException => cachedTopicPartitions = null + } +} + +def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data.topics.forEach ( topic => { +breakable{ + topic.partitions.forEach ( partition => { +if (partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) { + isAcknowledgeDataPresent = true + break +} else { + isAcknowledgeDataPresent = false +} + }) +} + }) + isAcknowledgeDataPresent +} + +val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() + +def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { +return true + } + false +} + +val topicNames = metadataCache.topicIdsToNames() +val shareFetchData = shareFetchRequest.shareFetchData(topicNames) +val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) + +val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) +var shareFetchContext : ShareFetchContext = null + +var shareFetchResponse : ShareFetchResponse = null + +// check if the Request is Invalid +if(isInvalidShareFetchRequest()) { + shareFetchResponse = shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception) match { +case response: ShareFetchResponse => response +case _ => null + } +} + +try { + // Creating the shareFetchContext for Share Session Handling + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) +} catch { + case e: Exception => shareFetchResponse = shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) match { +case response: ShareFetchResponse => response +case _ => null + } +} + +// Variable to store any error thrown while the handling piggybacked acknowledgements +var acknowledgeError : Errors = Errors.NONE +// Variable to store the topic partition wise result of piggybacked acknowledgements +var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + +// This check is done to make sure that there was no Share Session related error while creating shareFetchContext +if(shareFetchResponse == null) { Review
[jira] [Created] (KAFKA-17072) Document broker decommissioning process with KRaft
Mickael Maison created KAFKA-17072: -- Summary: Document broker decommissioning process with KRaft Key: KAFKA-17072 URL: https://issues.apache.org/jira/browse/KAFKA-17072 Project: Kafka Issue Type: Improvement Components: docs Reporter: Mickael Maison When decommissioning a broker in KRaft mode, the broker also has to be explicitly unregistered. This is not mentioned anywhere in the documentation. A broker not unregistered stays eligible for new partition assignment and will prevent bumping the metadata version if the remaining brokers are upgraded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
adixitconfluent commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664178535 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val topicNames = metadataCache.topicIdsToNames() +val sharePartitionManager : SharePartitionManager = sharePartitionManagerOption match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") +} + +val groupId = shareFetchRequest.data.groupId +val clientId = request.header.clientId +val memberId = shareFetchRequest.data().memberId() +val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch() + +val shareFetchData = shareFetchRequest.shareFetchData(topicNames) +val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) +var cachedTopicPartitions : util.List[TopicIdPartition] = null + +if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + try { +cachedTopicPartitions = sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId)) + } catch { +// Exception handling is needed when this value is being utilized on receiving FINAL_EPOCH. +case _: ShareSessionNotFoundException => cachedTopicPartitions = null + } +} + +def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data().topics().forEach ( topic => { +breakable{ + topic.partitions().forEach ( partition => { +if (partition.acknowledgementBatches() != null && !partition.acknowledgementBatches().isEmpty) { + isAcknowledgeDataPresent = true + break() +} else { + isAcknowledgeDataPresent = false +} + }) +} + }) + isAcknowledgeDataPresent +} + +val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() +var shareFetchResponse : ShareFetchResponse = null +// Variable to store any error thrown while the handling piggybacked acknowledgements +var acknowledgeError : Errors = Errors.NONE +// Variable to store the topic partition wise result of piggybacked acknowledgements +var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + +def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { +return true + } + false +} + +val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) +var shareFetchContext : ShareFetchContext = null +try { + // Creating the shareFetchContext for Share Session Handling + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) +} catch { + case e: Exception => shareFetchResponse = shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) match { +case response: ShareFetchResponse => response +case _ => null Review Comment: I agree with @apoorvmittal10 , seeing the newContext function, the possible errors are `INVALID_REQUEST`, `SHARE_SESSION_NOT_FOUND` and `INVALID_SHARE_SESSION_EPOCH`. In all such cases, we should be completing the API call with a top level there itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
adixitconfluent commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664178535 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val topicNames = metadataCache.topicIdsToNames() +val sharePartitionManager : SharePartitionManager = sharePartitionManagerOption match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") +} + +val groupId = shareFetchRequest.data.groupId +val clientId = request.header.clientId +val memberId = shareFetchRequest.data().memberId() +val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch() + +val shareFetchData = shareFetchRequest.shareFetchData(topicNames) +val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) +var cachedTopicPartitions : util.List[TopicIdPartition] = null + +if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + try { +cachedTopicPartitions = sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId)) + } catch { +// Exception handling is needed when this value is being utilized on receiving FINAL_EPOCH. +case _: ShareSessionNotFoundException => cachedTopicPartitions = null + } +} + +def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data().topics().forEach ( topic => { +breakable{ + topic.partitions().forEach ( partition => { +if (partition.acknowledgementBatches() != null && !partition.acknowledgementBatches().isEmpty) { + isAcknowledgeDataPresent = true + break() +} else { + isAcknowledgeDataPresent = false +} + }) +} + }) + isAcknowledgeDataPresent +} + +val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() +var shareFetchResponse : ShareFetchResponse = null +// Variable to store any error thrown while the handling piggybacked acknowledgements +var acknowledgeError : Errors = Errors.NONE +// Variable to store the topic partition wise result of piggybacked acknowledgements +var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + +def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { +return true + } + false +} + +val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) +var shareFetchContext : ShareFetchContext = null +try { + // Creating the shareFetchContext for Share Session Handling + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) +} catch { + case e: Exception => shareFetchResponse = shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) match { +case response: ShareFetchResponse => response +case _ => null Review Comment: I agree with @apoorvmittal10 , seeing the newContext function, the possible errors are `INVALID_REQUEST`, `SHARE_SESSION_NOT_FOUND` and `INVALID_SHARE_SESSION_EPOCH`. In all such cases, we should be completing the API call with a top level error code there itself. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel, } } +
[jira] [Commented] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records
[ https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862782#comment-17862782 ] Luca Molteni commented on KAFKA-17070: -- [~bmilk] sure go on > perf: consider to use ByteBufferOutputstream to append records > -- > > Key: KAFKA-17070 > URL: https://issues.apache.org/jira/browse/KAFKA-17070 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > > Consider to use ByteBufferOutputstream to append records, instead of a > DataOutputStream. We should add JMH test to confirm this indeed improve the > performance before merging it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records
[ https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862773#comment-17862773 ] dujian0068 commented on KAFKA-17070: Hello: This is a valuable question,Can I try it? > perf: consider to use ByteBufferOutputstream to append records > -- > > Key: KAFKA-17070 > URL: https://issues.apache.org/jira/browse/KAFKA-17070 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > > Consider to use ByteBufferOutputstream to append records, instead of a > DataOutputStream. We should add JMH test to confirm this indeed improve the > performance before merging it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17066: -- Component/s: clients > New consumer updateFetchPositions should perform all operations in background > thread > > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Blocker > Fix For: 3.9.0 > > > The updateFetchPositions func in the new consumer performs several actions > based on the assigned partitions from the subscriptionState. The way it's > currently implemented, it fetches committed offsets for partitions that > required a position (retrieved from subscription state in the app thread), > and then resets positions for the partitions still needing one (retrieved > from the subscription state but in the backgroud thread). > This is problematic, given that the assignment/subscriptionState may change > in the background thread at any time (ex. new partitions reconciled), so we > could end up resetting positions to the partition offsets for a partition for > which we never evetn attempted to retrieve committed offsets. > This sequence for a consumer that owns a partitions tp0,: > * consumer owns tp0 > * app thread -> updateFetchPositions triggers > initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned > partitions requiring a position (taking them from > subscriptions.initializingPartitions()). This will fetch committed offsets > for tp0 only. > * background thread -> receives new partition tp1 and completes > reconciliation (adds it to the subscription state as INITIALIZING, requires a > position) > * app thread -> updateFetchPositions resets positions for all partitions > that still don't have a valid position after initWithCommittedOffsetsIfNeeded > (taking them from subscriptionState.partitionsNeedingReset). This will > mistakenly consider that it should reset tp1 to the partition offsets, when > in reality it never even tried fetching the committed offsets for it because > it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. > We should consider moving the updateFetchPositions as a single event to the > background, that would safely use the subscriptionState object and apply all > actions involved in the updateFetchPositions to the same consistent set of > partitions assigned at that moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664144313 ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -427,6 +427,26 @@ public List cachedTopicIdPartitionsInShareSession(String group return cachedTopicIdPartitions; } +/** + * The acknowledgeShareSessionCacheUpdate method is used to update the share session cache before acknowledgements are handled + * either as part of shareFetch request or shareAcknowledge request + * @param groupId The group id in the request. + * @param memberId The member id of the client in the request. + * @param reqEpoch The request epoch. + */ +public void acknowledgeShareSessionCacheUpdate(String groupId, Uuid memberId, int reqEpoch) { Review Comment: nope, already taken care of in the latest commit. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2206000614 Hi @showuon I have simplified the description based on feedback, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
apoorvmittal10 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1664122712 ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -427,6 +427,26 @@ public List cachedTopicIdPartitionsInShareSession(String group return cachedTopicIdPartitions; } +/** + * The acknowledgeShareSessionCacheUpdate method is used to update the share session cache before acknowledgements are handled + * either as part of shareFetch request or shareAcknowledge request + * @param groupId The group id in the request. + * @param memberId The member id of the client in the request. + * @param reqEpoch The request epoch. + */ +public void acknowledgeShareSessionCacheUpdate(String groupId, Uuid memberId, int reqEpoch) { Review Comment: Is this method being used anywhere now? ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val topicNames = metadataCache.topicIdsToNames() +val sharePartitionManager : SharePartitionManager = sharePartitionManagerOption match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") +} + +val groupId = shareFetchRequest.data.groupId +val clientId = request.header.clientId +val memberId = shareFetchRequest.data().memberId() +val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch() + +val shareFetchData = shareFetchRequest.shareFetchData(topicNames) +val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) +var cachedTopicPartitions : util.List[TopicIdPartition] = null + +if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + try { +cachedTopicPartitions = sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId)) + } catch { +// Exception handling is needed when this value is being utilized on receiving FINAL_EPOCH. +case _: ShareSessionNotFoundException => cachedTopicPartitions = null + } +} + +def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data().topics().forEach ( topic => { +breakable{ + topic.partitions().forEach ( partition => { +if (partition.acknowledgementBatches() != null && !partition.acknowledgementBatches().isEmpty) { + isAcknowledgeDataPresent = true + break() +} else { + isAcknowledgeDataPresent = false +} + }) +} + }) + isAcknowledgeDataPresent +} + +val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() +var shareFetchResponse : ShareFetchResponse = null +// Variable to store any error thrown while the handling piggybacked acknowledgements +var acknowledgeError : Errors = Errors.NONE +// Variable to store the topic partition wise result of piggybacked acknowledgements +var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + +def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { +return true + } + false +} + +val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) +var shareFetchContext : ShareFetchContext = null +try { + // Creating the shareFetchContext for Share Session Handling + shareFetchContext =
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664131813 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) +.forEach(conf -> conf.errorMessages().add(errorMsg))); Review Comment: updated this. Forgot to roll it back to addErrorMessage when addressing some of other feedbacks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664131206 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: I moved this to MirrorConnectorConfig which all inherit. So this should be addressed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1664129210 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -234,6 +234,30 @@ public ConfigDef config() { @Override public org.apache.kafka.common.config.Config validate(Map props) { List configValues = super.validate(props).configValues(); +validateExactlyOnceConfigs(props, configValues); +validateEmitOffsetSyncConfigs(props, configValues); + +return new org.apache.kafka.common.config.Config(configValues); +} + +private static void validateEmitOffsetSyncConfigs(Map props, List configValues) { +boolean offsetSyncsConfigured = props.keySet().stream() +.anyMatch(conf -> conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX)); + +if ("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && offsetSyncsConfigured) { Review Comment: No, because `offsetSyncsConfigured` has default which is the global configs for topics and clients -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API
[ https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-17041: -- Issue Type: Improvement (was: Task) > Add pagination when describe large set of metadata via Admin API > - > > Key: KAFKA-17041 > URL: https://issues.apache.org/jira/browse/KAFKA-17041 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Major > > Some of the request via Admin API timeout on large cluster or cluster with > large set of specific metadata. For example OffsetFetchRequest and > DescribeLogDirsRequest timeout due to large number of partition on cluster. > Also DescribeProducersRequest and ListTransactionsRequest time out due to too > many short lived PID or too many hanging transactions > [KIP-1062: Introduce Pagination for some requests used by Admin > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)