[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss

2024-07-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862925#comment-17862925
 ] 

Luke Chen commented on KAFKA-17062:
---

Good quesiton that should we create new  
[RemoteLogSegmentId|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L850]
 when failed segment retried?

[~satishd] [~ckamal] , we'd like to hear your thought about it?

> RemoteLogManager - RemoteStorageException causes data loss
> --
>
> Key: KAFKA-17062
> URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.8.0, 3.7.1, 3.9.0
>Reporter: Guillaume Mallet
>Assignee: Guillaume Mallet
>Priority: Major
>  Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of all the segments size regardless of their state so computed 
> size of the topic is 1 (local) + 4 (remote)
>  ** Segment 1 as being the oldest will be dropped.
> At the second iteration after 
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
>  (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 
> in a {{COPY_SEGMENT_STARTED}} state each with a different 
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to 
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in 
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing 
> the LSO at the same time and causing the UnifiedLog to delete the local and 
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not 
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> 

Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-03 Thread via GitHub


frankvicky commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2208106073

   Hi @showuon
   I have adjust the description based on your feedback, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-03 Thread via GitHub


frankvicky commented on code in PR #16491:
URL: https://github.com/apache/kafka/pull/16491#discussion_r1665098473


##
docs/ops.html:
##
@@ -3946,6 +3946,7 @@ Enter Migration Mode on the Brokers
   
 
   
+broker.id: If 
broker.id.generation.enable is enabled (default is enabled) and 
broker.id is not set during migration, it will fail. Ensure 
broker.id is set to a non-negative integer.

Review Comment:
   Yes, I think your version is more concise, making it easier to read.  
   I will adopt it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17069) Remote copy throttle metrics

2024-07-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-17069.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Remote copy throttle metrics 
> -
>
> Key: KAFKA-17069
> URL: https://issues.apache.org/jira/browse/KAFKA-17069
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Abhijeet Kumar
>Priority: Major
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17069: Remote copy throttle metrics [kafka]

2024-07-03 Thread via GitHub


showuon merged PR #16086:
URL: https://github.com/apache/kafka/pull/16086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17069: Remote copy throttle metrics [kafka]

2024-07-03 Thread via GitHub


showuon commented on PR #16086:
URL: https://github.com/apache/kafka/pull/16086#issuecomment-2208096103

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-03 Thread via GitHub


showuon commented on code in PR #16491:
URL: https://github.com/apache/kafka/pull/16491#discussion_r1665094501


##
docs/ops.html:
##
@@ -3946,6 +3946,7 @@ Enter Migration Mode on the Brokers
   
 
   
+broker.id: If 
broker.id.generation.enable is enabled (default is enabled) and 
broker.id is not set during migration, it will fail. Ensure 
broker.id is set to a non-negative integer.

Review Comment:
   Looks better!
   Maybe we can just say: `Ensure broker.id is set to a 
non-negative integer even if broker.id.generation.enable is 
enabled (default is enabled). ` because:
   1. we don't need to mention `during migration` since this is a migration 
section
   2. We can combine these 2 sentences.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-07-03 Thread via GitHub


brenden20 commented on code in PR #16373:
URL: https://github.com/apache/kafka/pull/16373#discussion_r1665083663


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -485,6 +485,18 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +

Review Comment:
   I made this change now, just want to make sure I did it correctly 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1665069309


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   Cool. Related although not introduced by this PR, this makes me notice that 
we don't have this full story covered in the tests (not testing the call to 
`subscriptionState.unsubscribed` when `isNotInGroup` is false).  
   
   Would you mind adding the same verification you added for `notInGroup` true, 
to the func that covers this for `notInGroup` false? Should be the same 
`verify(subscriptionState).unsubscribe();` but added to 
`assertStaleMemberLeavesGroupAndClearsAssignment` I would say. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled [kafka]

2024-07-03 Thread via GitHub


vamossagar12 commented on PR #14158:
URL: https://github.com/apache/kafka/pull/14158#issuecomment-2208006307

   hey @C0urante , we recently hit another issue due to this. When you have 
some time, would it be possible for you to review this? It would be very 
helpful. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17061) KafkaController takes long time to connect to newly added broker after registration on large cluster

2024-07-03 Thread Haruki Okada (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haruki Okada updated KAFKA-17061:
-
Description: 
h2. Environment
 * Kafka version: 3.3.2
 * Cluster: 200~ brokers
 * Total num partitions: 40k
 * ZK-based cluster

h2. Phenomenon

When a broker left the cluster once due to the long STW and came back after a 
while, the controller took 6 seconds until connecting to the broker after znode 
registration, it caused significant message delivery delay.
{code:java}
[2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, 
deleted brokers: , bounced brokers: , all live brokers: 1,... 
(kafka.controller.KafkaController)
[2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 1 
trying to connect to broker 2 (kafka.controller.ControllerChannelManager)
[2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting 
(kafka.controller.RequestSendThread)
[2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback 
for 2 (kafka.controller.KafkaController)
[2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 1 
connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change 
requests (kafka.controller.RequestSendThread)
{code}
h2. Analysis

>From the flamegraph at that time, we can see that 
>[liveBrokerIds|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/controller/ControllerContext.scala#L217]
> calculation takes significant time.

!image-2024-07-02-17-24-11-861.png|width=541,height=303!

  was:
h2. Environment
 * Kafka version: 3.3.2
 * Cluster: 200~ brokers
 * Total num partitions: 40k
 * ZK-based cluster

h2. Phenomenon

When a broker left the cluster once due to the long STW and came back after a 
while, the controller took 6 seconds until connecting to the broker after znode 
registration, it caused significant message delivery delay.
{code:java}
[2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, 
deleted brokers: , bounced brokers: , all live brokers: 1,... 
(kafka.controller.KafkaController)
[2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 1 
trying to connect to broker 2 (kafka.controller.ControllerChannelManager)
[2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting 
(kafka.controller.RequestSendThread)
[2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback 
for 2 (kafka.controller.KafkaController)
[2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 1 
connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change 
requests (kafka.controller.RequestSendThread)
{code}
h2. Analysis

>From the flamegraph at that time, we can see that 
>[liveBrokerIds|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/controller/ControllerContext.scala#L217]
> calculation takes significant time.

!image-2024-07-02-17-24-11-861.png|width=541,height=303!

Since no concurrent modification against liveBrokerEpochs is expected, we can 
just cache the result to improve the performance.


> KafkaController takes long time to connect to newly added broker after 
> registration on large cluster
> 
>
> Key: KAFKA-17061
> URL: https://issues.apache.org/jira/browse/KAFKA-17061
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
> Attachments: image-2024-07-02-17-22-06-100.png, 
> image-2024-07-02-17-24-11-861.png
>
>
> h2. Environment
>  * Kafka version: 3.3.2
>  * Cluster: 200~ brokers
>  * Total num partitions: 40k
>  * ZK-based cluster
> h2. Phenomenon
> When a broker left the cluster once due to the long STW and came back after a 
> while, the controller took 6 seconds until connecting to the broker after 
> znode registration, it caused significant message delivery delay.
> {code:java}
> [2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, 
> deleted brokers: , bounced brokers: , all live brokers: 1,... 
> (kafka.controller.KafkaController)
> [2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 
> 1 trying to connect to broker 2 (kafka.controller.ControllerChannelManager)
> [2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting 
> (kafka.controller.RequestSendThread)
> [2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback 
> for 2 (kafka.controller.KafkaController)
> [2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 
> 1 connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change 
> requests (kafka.controller.RequestSendThread)
> {code}
> h2. Analysis
> From the flamegraph at that time, 

[jira] [Commented] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-03 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862899#comment-17862899
 ] 

PoAn Yang commented on KAFKA-17073:
---

Hi [~chia7712], I'm interested in this. If you're not working on it, may I take 
it? Thank you.

> Deprecate ReplicaVerificationTool in 3.9
> 
>
> Key: KAFKA-17073
> URL: https://issues.apache.org/jira/browse/KAFKA-17073
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
> Fix For: 3.9.0
>
>
> see discussion 
> https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py
> In short, the tool is useless and so it is good time to deprecate it in 3.9. 
> That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14094: Support for first leader bootstrapping the voter set [kafka]

2024-07-03 Thread via GitHub


jsancio commented on code in PR #16518:
URL: https://github.com/apache/kafka/pull/16518#discussion_r1664928188


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -151,6 +151,7 @@ public final class KafkaRaftClient implements 
RaftClient {
 public static final int MAX_FETCH_WAIT_MS = 500;
 public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
 public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
+public static final OffsetAndEpoch BOOTSTRAP_SNAPSHOT_ID = new 
OffsetAndEpoch(0, 0);

Review Comment:
   Let's define this in `o.a.k.s.Snapshots` instead.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -202,7 +214,46 @@ public void appendLeaderChangeMessage(long currentTimeMs) {
 .setVoters(voters)
 .setGrantingVoters(grantingVoters);
 
-accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+accumulator.appendControlMessages((baseOffset, epoch, buffer) -> {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+RecordBatch.CURRENT_MAGIC_VALUE,
+Compression.NONE,
+TimestampType.CREATE_TIME,
+baseOffset,
+currentTimeMs,
+RecordBatch.NO_PRODUCER_ID,
+RecordBatch.NO_PRODUCER_EPOCH,
+RecordBatch.NO_SEQUENCE,
+false,
+true,
+epoch,
+buffer.capacity()
+)
+) {
+builder.appendLeaderChangeMessage(
+currentTimeMs,
+leaderChangeMessage
+);
+VoterSetOffset voterSetOffset = 
lastVoterSetOffset.orElse(null);
+// if lastVoterOffset is -1 we know the leader hasn't written 
the bootstrap snapshot records to the log yet
+if (voterSetOffset != null && voterSetOffset.offset() == -1) {

Review Comment:
   Avoid the use of `null`.
   ```java
   lastVoterSetOffset.ifPresent(voterSetOffset -> {
   // if lastVoterOffset is -1 we know the leader hasn't 
written the bootstrap snapshot records to the log yet
   if (voterSetOffset.offset() == -1) {
   ```



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetOffset.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+public class VoterSetOffset {
+private final VoterSet voterSet;
+private final Long offset;

Review Comment:
   An object is not needed. Use `long` instead. It takes less space and doesn't 
require a dereference.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetOffset.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+public class VoterSetOffset {

Review Comment:
   Why did you add this type instead of using `LogHistory.Entry`?



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -454,6 +455,7 @@ public void initialize(
 nodeId,
 nodeDirectoryId,
 partitionState::lastVoterSet,
+partitionState::lastVoterSetOffset,

Review Comment:
   It is unfortunate that we need to add a parameter 

Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-07-03 Thread via GitHub


m1a2st commented on PR #15966:
URL: https://github.com/apache/kafka/pull/15966#issuecomment-2207562526

   @chia7712, Thanks for your comments, I will open new PR for this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17077) the node.id is inconsistent to broker.id when "broker.id.generation.enable=true"

2024-07-03 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862892#comment-17862892
 ] 

TengYao Chi commented on KAFKA-17077:
-

Hi [~chia7712] 
If you are not start working on it, I would like to have it 

> the node.id is inconsistent to broker.id when 
> "broker.id.generation.enable=true"
> 
>
> Key: KAFKA-17077
> URL: https://issues.apache.org/jira/browse/KAFKA-17077
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Critical
>
> We change the broker id of `KafkaConfig` directly when 
> `broker.id.generation.enable=true` [0]. However, the update is NOT sync to 
> node.id of `KafkaConfig`. It results in following issues:
> 1. we can see many "-1" in the log. for example:
> {code:sh}
> [2024-07-03 19:23:08,453] INFO [ExpirationReaper--1-AlterAcls]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> {code}
> 2.  `KafkaRaftManager` will use uninitialized node.id to create 
> `KafkaRaftClient` in migration [1], and the error sequentially happens
> [0] 
> https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/server/KafkaServer.scala#L261
> [1] 
> https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/raft/RaftManager.scala#L230



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2024-07-03 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862891#comment-17862891
 ] 

Greg Harris commented on KAFKA-10370:
-

I'm reopening this issue as we have a fresh bug report: 
[https://lists.apache.org/thread/93msfod4bltb7dw73trdyfh95zbldo58] this time 
with the Snowflake connector.

While it may be possible to solve this in the task implementation, I think that 
we could also change the framework to make this error non-fatal, and just drop 
offsets for unassigned partitions.

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Priority: Major
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> 

[jira] [Reopened] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2024-07-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reopened KAFKA-10370:
-
  Assignee: (was: Ning Zhang)

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Priority: Major
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use *consumer.assign* with *consumer.seek* , instead of 
> *consumer.subscribe*, to handle 

[PR] KAFKA-17078: Add SecurityManagerCompatibility shim [kafka]

2024-07-03 Thread via GitHub


gharris1727 opened a new pull request, #16522:
URL: https://github.com/apache/kafka/pull/16522

   This is a shim to allow the use of both modern (JRE 18+) and legacy APIs 
(deprecated in 17, degraded in 23, removal unknown) related to [JEP 411: 
Deprecate the Security Manager for Removal](https://openjdk.org/jeps/411).
   
   This shim implements the interim strategy outlined in [KIP-1006: Remove 
SecurityManager 
Support](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support)
 where legacy APIs are used as long as they are available and functional. Once 
the legacy APIs are removed or degraded, the modern APIs are used instead.
   
   The shim is structured as a general interface with four 'strategy' 
implementations for testability. These implementations allow for mocking out 
the classloading infrastructure to simulate situations which no current JRE 
implements, namely removal and further degradation of functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2207337445

   @philipnee @lianetm thank you both for the feedback, I have addressed all 
comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Update the properties files for controller with advertised.listeners [kafka]

2024-07-03 Thread via GitHub


jsancio merged PR #16473:
URL: https://github.com/apache/kafka/pull/16473


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR; Add property methods to LogOffsetMetadata [kafka]

2024-07-03 Thread via GitHub


jsancio opened a new pull request, #16521:
URL: https://github.com/apache/kafka/pull/16521

   This change simply adds property methods to LogOffsetMetadata. It changes 
all of the callers to use the new property methods instead of using the fields 
directly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-03 Thread via GitHub


jsancio merged PR #16454:
URL: https://github.com/apache/kafka/pull/16454


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-03 Thread via GitHub


jsancio commented on PR #16454:
URL: https://github.com/apache/kafka/pull/16454#issuecomment-2207255514

   Unrelated failures. Mergning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17078) Add SecurityManager reflective shim

2024-07-03 Thread Greg Harris (Jira)
Greg Harris created KAFKA-17078:
---

 Summary: Add SecurityManager reflective shim
 Key: KAFKA-17078
 URL: https://issues.apache.org/jira/browse/KAFKA-17078
 Project: Kafka
  Issue Type: Task
  Components: clients, connect, Tiered-Storage
Reporter: Greg Harris
Assignee: Greg Harris


Add a shim class to allow for detection and usage of legacy and modern methods 
before and after the SecurityManager removal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664751869


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +167,30 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public static Map validate(Map configs) {
+Map invalidConfigs = new HashMap<>();
+
+// No point to validate when connector is disabled.
+if ("false".equals(configs.getOrDefault(ENABLED, "true"))) {
+return invalidConfigs;
+}
+
+boolean emitCheckpointDisabled = 
"false".equals(configs.getOrDefault(EMIT_CHECKPOINTS_ENABLED, "true"));
+boolean syncGroupOffsetsDisabled = 
"false".equals(configs.getOrDefault(SYNC_GROUP_OFFSETS_ENABLED, "true"));

Review Comment:
   the default value of `SYNC_GROUP_OFFSETS_ENABLED` is `false`, so why we 
don't use the same default value here?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +167,30 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public static Map validate(Map configs) {
+Map invalidConfigs = new HashMap<>();
+
+// No point to validate when connector is disabled.
+if ("false".equals(configs.getOrDefault(ENABLED, "true"))) {
+return invalidConfigs;
+}
+
+boolean emitCheckpointDisabled = 
"false".equals(configs.getOrDefault(EMIT_CHECKPOINTS_ENABLED, "true"));
+boolean syncGroupOffsetsDisabled = 
"false".equals(configs.getOrDefault(SYNC_GROUP_OFFSETS_ENABLED, "true"));
+
+if (emitCheckpointDisabled && syncGroupOffsetsDisabled) {
+Arrays.asList(SYNC_GROUP_OFFSETS_ENABLED, 
EMIT_CHECKPOINTS_ENABLED).forEach(configName -> {
+invalidConfigs.putIfAbsent(configName, 
"MirrorCheckpointConnector can't run with both " + SYNC_GROUP_OFFSETS_ENABLED + 
" and " +

Review Comment:
   It seems to me `EMIT_CHECKPOINTS_ENABLED` does not obstruct 
`MirrorCheckpointConnector` from running since it is used to update consumer 
groups offsets of target cluster. By contrast, `SYNC_GROUP_OFFSETS_ENABLED` do 
impact the `MirrorCheckpointConnector`
   
   
   
https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L121



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664701302


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]

2024-07-03 Thread via GitHub


soarez commented on PR #16287:
URL: https://github.com/apache/kafka/pull/16287#issuecomment-2207104921

   Yes, @jlprat please share any issues you run into.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-03 Thread via GitHub


chia7712 commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2207036363

   > The inconsistency between node.id and broker.id (when generated id is 
enabled) can be addressed in another issue. I will file it later
   
   https://issues.apache.org/jira/browse/KAFKA-17077


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17077) the node.id is inconsistent to broker.id when "broker.id.generation.enable=true"

2024-07-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17077:
--

 Summary: the node.id is inconsistent to broker.id when 
"broker.id.generation.enable=true"
 Key: KAFKA-17077
 URL: https://issues.apache.org/jira/browse/KAFKA-17077
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We change the broker id of `KafkaConfig` directly when 
`broker.id.generation.enable=true` [0]. However, the update is NOT sync to 
node.id of `KafkaConfig`. It results in following issues:

1. we can see many "-1" in the log. for example:
{code:sh}
[2024-07-03 19:23:08,453] INFO [ExpirationReaper--1-AlterAcls]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
{code}

2.  `KafkaRaftManager` will use uninitialized node.id to create 
`KafkaRaftClient` in migration [1], and the error sequentially happens

[0] 
https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/server/KafkaServer.scala#L261
[1] 
https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/core/src/main/scala/kafka/raft/RaftManager.scala#L230



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664632832


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -594,24 +740,13 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-DEFAULT_HEARTBEAT_INTERVAL_MS,
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0));
-backgroundEventHandler = mock(BackgroundEventHandler.class);
-
 heartbeatRequestManager = createHeartbeatRequestManager(
 coordinatorRequestManager,
 membershipManager,
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler);
+

Review Comment:
   can we remove the extra space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664631319


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -277,27 +332,29 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
-"previous one is in-flight");
+"previous one is in-flight");
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
-"interval expires if there is a previous HB request in-flight");
+"interval expires if there is a previous HB request 
in-flight");
 
 // Receive response for the inflight after the interval expired. The 
next HB should be sent
 // on the next poll waiting only for the minimal backoff.
 inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
 time.sleep(DEFAULT_RETRY_BACKOFF_MS);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size(), "A next heartbeat should 
be sent on " +
-"the first poll after receiving a response that took longer than 
the interval, " +
-"waiting only for the minimal backoff.");
+"the first poll after receiving a response that took longer 
than the interval, " +

Review Comment:
   could you revert the changes here? (as well as the ones above and below) I 
think these are IDE indentation changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17075: Use new health check endpoint in Connect system tests to verify worker readiness [kafka]

2024-07-03 Thread via GitHub


C0urante opened a new pull request, #16520:
URL: https://github.com/apache/kafka/pull/16520

   [Jira](https://issues.apache.org/jira/browse/KAFKA-17075)
   
   Introduces a new startup mode, `STARTUP_MODE_HEALTH_CHECK`, and uses that as 
the new default value, for verifying worker readiness during system tests. This 
mode waits for a 200 response from the `GET /health` endpoint from the worker, 
and is resilient against temporary errors that may occur during startup (such 
as failure to start the REST server or initialize its resources yet).
   
   I have not been able to run the full gamut of Connect system tests locally, 
but I have verified that this works with the 
[test_dynamic_logging](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_distributed_test.py#L464)
 case in 
[connect_distributed_test.py](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_distributed_test.py)
 and all tests in 
[connect_test.py](https://github.com/apache/kafka/blob/27220d146c5d043da4adc3d636036bd6e7b112d2/tests/kafkatest/tests/connect/connect_test.py).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-07-03 Thread via GitHub


chia7712 commented on PR #15966:
URL: https://github.com/apache/kafka/pull/15966#issuecomment-2206969288

   ```
   org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not 
complete execution for Gradle Test Executor 100.
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.stop(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:119)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:66)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
net.bytebuddy.description.type.TypeDescription$ForLoadedType.of(TypeDescription.java:8619)
at 
net.bytebuddy.description.method.MethodDescription$ForLoadedMethod.getDeclaringType(MethodDescription.java:1190)
at 
org.mockito.internal.creation.bytebuddy.MockMethodAdvice.isOverridden(MockMethodAdvice.java:199)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.isUnavailable(ConsumerNetworkClient.java:560)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.isUnavailable(Fetcher.java:87)
at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:427)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:105)
at 
org.apache.kafka.clients.consumer.internals.FetcherTest.sendFetches(FetcherTest.java:246)
at 
org.apache.kafka.clients.consumer.internals.FetcherTest.testFetcherConcurrency(FetcherTest.java:2943)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
   ```
   
   the failed test is related to this PR. In the test case 
`testFetcherConcurrency`, it does not return correct `sessionTopicNames` so 
normally it should NOT see the correct response data. However, 
`FetchResponse#responseData` will return the cached data regardless of input, 
so it CAN get correct response data even though it pass empty `topicNames`. 
That is a good example of showing the potential bug :)
   
   @m1a2st Could you copy the changes of this PR to another one, and please fix 
`testFetcherConcurrency` according to my comment. Also, please add new test for 
the change.
   
   @johnnychhsu Sorry, I can't merge this PR as it causes the failed test. 
Please feel free to close this PR as @m1a2st will leverage this PR to complete 
it :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17076) logEndOffset could be lost due to log cleaning

2024-07-03 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862868#comment-17862868
 ] 

Jun Rao commented on KAFKA-17076:
-

One potential solution is to adjust the log cleaning logic such that it always 
preserves the last batch during each round of cleaning. If all records in the 
last batch are removed, we can just retain the empty batch to preserve the last 
offset. The empty batch will then be replicated to all replicas to preserve the 
true logEndOffset.

> logEndOffset could be lost due to log cleaning
> --
>
> Key: KAFKA-17076
> URL: https://issues.apache.org/jira/browse/KAFKA-17076
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Priority: Major
>
> It's possible for the log cleaner to remove all records in the suffix of the 
> log. If the partition is then reassigned, the new replica won't be able to 
> see the true logEndOffset since there is no record batch associated with it. 
> If this replica becomes the leader, it will assign an already used offset to 
> a newly produced record, which is incorrect.
>  
> It's relatively rare to trigger this issue since the active segment is never 
> cleaned and typically is not empty. However, the following is one possibility.
>  # records with offset 100-110 are produced and fully replicated to all ISR. 
> All those records are delete records for certain keys.
>  # record with offset 111 is produced. It forces the roll of a new segment in 
> broker b1 and is added to the log. The record is not committed and is later 
> truncated from the log, leaving an empty active segment in this log. b1 at 
> some point becomes the leader.
>  # log cleaner kicks in and removes records 100-110.
>  # The partition is reassigned to another broker b2. b2 replicates all 
> records from b1 up to offset 100 and marks its logEndOffset at 100. Since 
> there is no record to replicate after offset 100 in b1, b2's logEndOffset 
> stays at 100 and b2 can join the ISR.
>  # b2 becomes the leader and assign offset 100 to a new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17076) logEndOffset could be lost due to log cleaning

2024-07-03 Thread Jun Rao (Jira)
Jun Rao created KAFKA-17076:
---

 Summary: logEndOffset could be lost due to log cleaning
 Key: KAFKA-17076
 URL: https://issues.apache.org/jira/browse/KAFKA-17076
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao


It's possible for the log cleaner to remove all records in the suffix of the 
log. If the partition is then reassigned, the new replica won't be able to see 
the true logEndOffset since there is no record batch associated with it. If 
this replica becomes the leader, it will assign an already used offset to a 
newly produced record, which is incorrect.

 

It's relatively rare to trigger this issue since the active segment is never 
cleaned and typically is not empty. However, the following is one possibility.
 # records with offset 100-110 are produced and fully replicated to all ISR. 
All those records are delete records for certain keys.
 # record with offset 111 is produced. It forces the roll of a new segment in 
broker b1 and is added to the log. The record is not committed and is later 
truncated from the log, leaving an empty active segment in this log. b1 at some 
point becomes the leader.
 # log cleaner kicks in and removes records 100-110.
 # The partition is reassigned to another broker b2. b2 replicates all records 
from b1 up to offset 100 and marks its logEndOffset at 100. Since there is no 
record to replicate after offset 100 in b1, b2's logEndOffset stays at 100 and 
b2 can join the ISR.
 # b2 becomes the leader and assign offset 100 to a new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17075) Use health check endpoint to verify Connect worker readiness in system tests

2024-07-03 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17075:
-

 Summary: Use health check endpoint to verify Connect worker 
readiness in system tests
 Key: KAFKA-17075
 URL: https://issues.apache.org/jira/browse/KAFKA-17075
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Affects Versions: 3.9.0
Reporter: Chris Egerton
Assignee: Chris Egerton


We introduced a health check endpoint for Kafka Connect as part of work on 
KAFKA-10816. We should start to use that endpoint to verify worker readiness in 
our system tests, instead of scanning worker logs for specific messages or 
hitting other, less-reliable REST endpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe

2024-07-03 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-10816.
---
Fix Version/s: 3.9.0
   Resolution: Done

> Connect REST API should have a resource that can be used as a readiness probe
> -
>
> Key: KAFKA-10816
> URL: https://issues.apache.org/jira/browse/KAFKA-10816
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> There are a few ways to accurately detect whether a Connect worker is 
> *completely* ready to process all REST requests:
> # Wait for {{Herder started}} in the Connect worker logs
> # Use the REST API to issue a request that will be completed only after the 
> herder has started, such as {{GET /connectors/{name}/}} or {{GET 
> /connectors/{name}/status}}.
> Other techniques can be used to detect other startup states, though none of 
> these will guarantee that the worker has indeed completely started up and can 
> process all REST requests:
> * {{GET /}} can be used to know when the REST server has started, but this 
> may be before the worker has started completely and successfully.
> * {{GET /connectors}} can be used to know when the REST server has started, 
> but this may be before the worker has started completely and successfully. 
> And, for the distributed Connect worker, this may actually return an older 
> list of connectors if the worker hasn't yet completely read through the 
> internal config topic. It's also possible that this request returns even if 
> the worker is having trouble reading from the internal config topic.
> * {{GET /connector-plugins}} can be used to know when the REST server has 
> started, but this may be before the worker has started completely and 
> successfully.
> The Connect REST API should have an endpoint that more obviously and more 
> simply can be used as a readiness probe. This could be a new resource (e.g., 
> {{GET /status}}), though this would only work on newer Connect runtimes, and 
> existing tooling, installations, and examples would have to be modified to 
> take advantage of this feature (if it exists). 
> Alternatively, we could make sure that the existing resources (e.g., {{GET 
> /}} or {{GET /connectors}}) wait for the herder to start completely; this 
> wouldn't require a KIP and it would not require clients use different 
> technique for newer and older Connect runtimes. (Whether or not we back port 
> this is another question altogether, since it's debatable whether the 
> behavior of the existing REST resources is truly a bug.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante merged PR #16477:
URL: https://github.com/apache/kafka/pull/16477


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante commented on PR #16477:
URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206933077

   Thanks Greg!
   
   I've realized the TODO in the code base was unnecessary and this should be 
safe to merge without waiting for more green CI runs (see the latest commit 
message for more details).
   
   Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]

2024-07-03 Thread via GitHub


jolshan commented on PR #16421:
URL: https://github.com/apache/kafka/pull/16421#issuecomment-2206865002

   https://github.com/apache/kafka/pull/16183 will wait for this PR in order to 
not break tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Await formation of consumer groups before verifying expected sink connector offsets in OffsetsApiIntegrationTest [kafka]

2024-07-03 Thread via GitHub


C0urante opened a new pull request, #16519:
URL: https://github.com/apache/kafka/pull/16519

   Draft; will flesh out description if this yields positive results.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-03 Thread via GitHub


jeffkbkim commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1664527321


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,294 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new 

Re: [PR] KAFKA-16228: Add remote log metadata flag to the dump log tool [kafka]

2024-07-03 Thread via GitHub


fvaleri commented on code in PR #16475:
URL: https://github.com/apache/kafka/pull/16475#discussion_r1664522945


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -243,6 +244,32 @@ class DumpLogSegmentsTest {
 assertEquals(Map.empty, errors.shallowOffsetNotFound)
   }
 
+  @Test
+  def testDumpRemoteLogMetadataRecords(): Unit = {

Review Comment:
   Hi @divijvaidya, thanks. Most of the suggested tests are implemented now.
   
   > 3. metadata contains multiple records (testing with 2 is fine), one batch
   
   I guess this is the original test.
   
   > do we compact metadata? If yes, can you add cases where segments is a 
compacted segment (has some offsets missing).
   
   No, the cleanup policy for this topic is hard coded to delete. See 
[here](https://github.com/apache/kafka/blob/3.7.1/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java#L491).
   
   > do we compress metadata? If test, can you add cases which validate correct 
deserialization for different compression types
   
   No, they are not compressed. See for example 
[here.](https://github.com/apache/kafka/blob/3.7.1/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L746-L750)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig [kafka]

2024-07-03 Thread via GitHub


chia7712 commented on PR #16458:
URL: https://github.com/apache/kafka/pull/16458#issuecomment-220683

   I double-check all configs and yes all of them are not dynamic. Maybe we 
don't need to be over-engineering for now. Hence, we can have a PR for 
following changes.
   
   1. re-introduce attributes to `GroupCoordinatorConfig` @dajac 
[comment](https://github.com/apache/kafka/pull/16458#discussion_r1662130270)
   2. move `GroupCoordinatorConfig` validation from `KafkaConfig` to 
`GroupCoordinatorConfig` ( @OmniaGM 
[comment](https://github.com/apache/kafka/pull/16458#discussion_r1662318237))
   3. do validation in construction of `GroupCoordinatorConfig`. I guess this 
is a bit different to @OmniaGM idea (#16506 add a method `validate` to 
`ShareGroupConfig` instead of validating configs in construction). I prefer to 
validate all configs in construction as `KafakConfig` do validation in 
construction too.
   4. add docs to `GroupCoordinatorConfig` to explain why we add attributes
   
   @brandboat @dajac @OmniaGM PTAL, I hope this can be a guideline for all 
similar config class


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-03 Thread via GitHub


cmccabe commented on code in PR #16454:
URL: https://github.com/apache/kafka/pull/16454#discussion_r1664491334


##
raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java:
##
@@ -58,193 +60,229 @@ private CandidateState newCandidateState(VoterSet voters) 
{
 );
 }
 
-@Test
-public void testSingleNodeQuorum() {
-CandidateState state = 
newCandidateState(voterSetWithLocal(IntStream.empty()));
+@ParameterizedTest
+@ValueSource(booleans = { true, false })

Review Comment:
   I'm ok with either way. I'm just surprised checkstyle accepts it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup TestPlugins and normalize TestPlugin enum [kafka]

2024-07-03 Thread via GitHub


gharris1727 merged PR #1:
URL: https://github.com/apache/kafka/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup TestPlugins and normalize TestPlugin enum [kafka]

2024-07-03 Thread via GitHub


gharris1727 commented on PR #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-2206772302

   Test failures appear unrelated, and the connect tests pass for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17073:
---
Labels: need-kip  (was: )

> Deprecate ReplicaVerificationTool in 3.9
> 
>
> Key: KAFKA-17073
> URL: https://issues.apache.org/jira/browse/KAFKA-17073
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
> Fix For: 3.9.0
>
>
> see discussion 
> https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py
> In short, the tool is useless and so it is good time to deprecate it in 3.9. 
> That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17074) Remove ReplicaVerificationTool

2024-07-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17074:
--

 Summary: Remove ReplicaVerificationTool
 Key: KAFKA-17074
 URL: https://issues.apache.org/jira/browse/KAFKA-17074
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai
 Fix For: 4.0.0


this is follow-up of KAFKA-17073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17073:
--

 Summary: Deprecate ReplicaVerificationTool in 3.9
 Key: KAFKA-17073
 URL: https://issues.apache.org/jira/browse/KAFKA-17073
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see discussion https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py

In short, the tool is useless and so it is good time to deprecate it in 3.9. 
That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17073:
---
Fix Version/s: 3.9.0

> Deprecate ReplicaVerificationTool in 3.9
> 
>
> Key: KAFKA-17073
> URL: https://issues.apache.org/jira/browse/KAFKA-17073
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 3.9.0
>
>
> see discussion 
> https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py
> In short, the tool is useless and so it is good time to deprecate it in 3.9. 
> That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


gharris1727 commented on code in PR #16477:
URL: https://github.com/apache/kafka/pull/16477#discussion_r1664476993


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/WorkerStatus.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.apache.kafka.connect.util.Stage;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class WorkerStatus {
+
+private final String status;
+private final String message;
+
+@JsonCreator
+private WorkerStatus(
+@JsonProperty("status") String status,
+@JsonProperty("message") String message
+) {
+this.status = status;
+this.message = message;
+}
+
+public static WorkerStatus healthy() {
+return new WorkerStatus(
+"healthy",
+"Worker has completed startup and is ready to handle requests."
+);
+}
+
+public static WorkerStatus starting(Stage stage) {

Review Comment:
   Ah yeah this is very reasonable. For some reason I thought there were a lot 
of call-sites that would need an extra null guard, but that wasn't the case. 
This is great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14094: Support for first leader bootstrapping the voter set (wip) [kafka]

2024-07-03 Thread via GitHub


ahuang98 opened a new pull request, #16518:
URL: https://github.com/apache/kafka/pull/16518

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante commented on PR #16477:
URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206645942

   Thanks for the review @gharris1727! This is ready for another pass when you 
have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2024-07-03 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14941:
-

Assignee: Gantigmaa Selenge

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [Don't Merge] Test [kafka]

2024-07-03 Thread via GitHub


TaiJuWu commented on PR #16463:
URL: https://github.com/apache/kafka/pull/16463#issuecomment-2206506979

   Patch
   ```
   Benchmark(aclCount)  
(authorizerType)  (denyPercentage)  (resourceCount)  Mode  Cnt   ScoreError 
 Units
   AuthorizerBenchmark.testAclsIterator 10   
ACL20 5000  avgt   15   1.528 ±  0.014  ms/op
   AuthorizerBenchmark.testAclsIterator 10   
ACL201  avgt   15   5.343 ±  0.207  ms/op
   AuthorizerBenchmark.testAclsIterator 10 
KRAFT20 5000  avgt   15   2.457 ±  0.015  ms/op
   AuthorizerBenchmark.testAclsIterator 10 
KRAFT201  avgt   15   5.722 ±  0.084  ms/op
   AuthorizerBenchmark.testAclsIterator 20   
ACL20 5000  avgt   15   5.514 ±  0.180  ms/op
   AuthorizerBenchmark.testAclsIterator 20   
ACL201  avgt   15  20.672 ±  0.334  ms/op
   AuthorizerBenchmark.testAclsIterator 20 
KRAFT20 5000  avgt   15   9.638 ±  7.254  ms/op
   AuthorizerBenchmark.testAclsIterator 20 
KRAFT201  avgt   15  12.769 ±  0.207  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  10   
ACL20 5000  avgt   15   0.007 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  10   
ACL201  avgt   15   0.007 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  10 
KRAFT20 5000  avgt   15  17.104 ±  0.267  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  10 
KRAFT201  avgt   15  25.149 ±  0.412  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  20   
ACL20 5000  avgt   15   0.007 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  20   
ACL201  avgt   15   0.007 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  20 
KRAFT20 5000  avgt   15  24.736 ±  0.407  ms/op
   AuthorizerBenchmark.testAuthorizeByResourceType  20 
KRAFT201  avgt   15  46.468 ±  0.409  ms/op
   AuthorizerBenchmark.testAuthorizer   10   
ACL20 5000  avgt   15   0.107 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizer   10   
ACL201  avgt   15   0.130 ±  0.001  ms/op
   AuthorizerBenchmark.testAuthorizer   10 
KRAFT20 5000  avgt   15   0.270 ±  0.002  ms/op
   AuthorizerBenchmark.testAuthorizer   10 
KRAFT201  avgt   15   1.562 ±  1.101  ms/op
   AuthorizerBenchmark.testAuthorizer   20   
ACL20 5000  avgt   15   0.481 ±  0.070  ms/op
   AuthorizerBenchmark.testAuthorizer   20   
ACL201  avgt   15   0.724 ±  0.112  ms/op
   AuthorizerBenchmark.testAuthorizer   20 
KRAFT20 5000  avgt   15   0.309 ±  0.004  ms/op
   AuthorizerBenchmark.testAuthorizer   20 
KRAFT201  avgt   15   0.415 ±  0.004  ms/op
   AuthorizerBenchmark.testUpdateCache  10   
ACL20 5000  avgt   15  15.881 ±  0.162  ms/op
   AuthorizerBenchmark.testUpdateCache  10   
ACL201  avgt   15  35.924 ±  0.468  ms/op
   AuthorizerBenchmark.testUpdateCache  10 
KRAFT20 5000  avgt   15  ≈ 10⁻⁶   ms/op
   AuthorizerBenchmark.testUpdateCache  10 
KRAFT201  avgt   15  ≈ 10⁻⁶   ms/op
   AuthorizerBenchmark.testUpdateCache  20   
ACL20 5000  avgt   15  31.230 ±  1.868  ms/op
   AuthorizerBenchmark.testUpdateCache  20   
ACL201  avgt   15  60.179 ±  0.381  ms/op
   AuthorizerBenchmark.testUpdateCache  20 
KRAFT20 5000  avgt   15  ≈ 10⁻⁶   ms/op
   

Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


C0urante commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664381241


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   No worries! Latest looks good to me 
   
   @chia7712 thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16953) Properly implement the sending of DescribeQuorumResponse

2024-07-03 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16953:
---
Description: 
The current implement doesn't accurately implement the different version of the 
response. I removed the buggy code in 
[https://github.com/apache/kafka/pull/16454]

This needs to get reimplemented properly.

  was:The current implementation of QuorumDescribe doesn't populate the 
listeners in node collections. This needs to get fixed and tested.


> Properly implement the sending of DescribeQuorumResponse
> 
>
> Key: KAFKA-16953
> URL: https://issues.apache.org/jira/browse/KAFKA-16953
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Priority: Major
> Fix For: 3.9.0
>
>
> The current implement doesn't accurately implement the different version of 
> the response. I removed the buggy code in 
> [https://github.com/apache/kafka/pull/16454]
> This needs to get reimplemented properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664368993


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   I updated `MirrorCheckpointConnector::validate` to create configValue if the 
name of the config not defined



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16953) Properly implement the sending of DescribeQuorumResponse

2024-07-03 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16953:
---
Summary: Properly implement the sending of DescribeQuorumResponse  (was: 
Implement and test listeners in DescribeQuorumResponse)

> Properly implement the sending of DescribeQuorumResponse
> 
>
> Key: KAFKA-16953
> URL: https://issues.apache.org/jira/browse/KAFKA-16953
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Priority: Major
> Fix For: 3.9.0
>
>
> The current implementation of QuorumDescribe doesn't populate the listeners 
> in node collections. This needs to get fixed and tested.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664376985


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   Sorry for the confusion I just lost track of @mimaison point from before! 
this why I reverted config back to `MirrorConnectorConfig` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) [kafka]

2024-07-03 Thread via GitHub


apoorvmittal10 commented on code in PR #16516:
URL: https://github.com/apache/kafka/pull/16516#discussion_r1664366052


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java:
##
@@ -209,14 +211,40 @@ public static CoordinatorRecord newGroupEpochRecord(
 public static CoordinatorRecord newGroupEpochTombstoneRecord(
 String groupId
 ) {
-return new CoordinatorRecord(
-new ApiMessageAndVersion(
-new ConsumerGroupMetadataKey()
-.setGroupId(groupId),
-(short) 3
-),
-null // Tombstone.
-);
+return newGroupEpochTombstoneRecord(groupId, GroupType.CONSUMER);
+}
+
+/**
+ * Creates a ConsumerGroupMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param groupType The group type.
+ * @return The record.
+ */
+public static CoordinatorRecord newGroupEpochTombstoneRecord(
+String groupId,
+GroupType groupType
+) {
+if (groupType == GroupType.CONSUMER) {
+return new CoordinatorRecord(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey()
+.setGroupId(groupId),
+(short) 3
+),
+null // Tombstone.
+);
+} else if (groupType == GroupType.SHARE) {
+return new CoordinatorRecord(
+new ApiMessageAndVersion(
+new ShareGroupMetadataKey()
+.setGroupId(groupId),
+(short) 3

Review Comment:
   My bad, corrected and added tests as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664366556


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+Metrics metrics = new Metrics(time);

Review Comment:
   for aesthetic purpose, let's group all the `this` and non `this` i.e., can 
we move metrics to the bottom? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) [kafka]

2024-07-03 Thread via GitHub


AndrewJSchofield commented on code in PR #16516:
URL: https://github.com/apache/kafka/pull/16516#discussion_r1664357349


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java:
##
@@ -209,14 +211,40 @@ public static CoordinatorRecord newGroupEpochRecord(
 public static CoordinatorRecord newGroupEpochTombstoneRecord(
 String groupId
 ) {
-return new CoordinatorRecord(
-new ApiMessageAndVersion(
-new ConsumerGroupMetadataKey()
-.setGroupId(groupId),
-(short) 3
-),
-null // Tombstone.
-);
+return newGroupEpochTombstoneRecord(groupId, GroupType.CONSUMER);
+}
+
+/**
+ * Creates a ConsumerGroupMetadata tombstone.
+ *
+ * @param groupId   The consumer group id.
+ * @param groupType The group type.
+ * @return The record.
+ */
+public static CoordinatorRecord newGroupEpochTombstoneRecord(
+String groupId,
+GroupType groupType
+) {
+if (groupType == GroupType.CONSUMER) {
+return new CoordinatorRecord(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey()
+.setGroupId(groupId),
+(short) 3
+),
+null // Tombstone.
+);
+} else if (groupType == GroupType.SHARE) {
+return new CoordinatorRecord(
+new ApiMessageAndVersion(
+new ShareGroupMetadataKey()
+.setGroupId(groupId),
+(short) 3

Review Comment:
   This should be 11.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move config getters and validation into server [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on PR #16307:
URL: https://github.com/apache/kafka/pull/16307#issuecomment-2206434066

   closing this until we agree on pattern for config classes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move config getters and validation into server [kafka]

2024-07-03 Thread via GitHub


OmniaGM closed pull request #16307: KAFKA-15853: Move config getters and 
validation into server
URL: https://github.com/apache/kafka/pull/16307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]

2024-07-03 Thread via GitHub


omkreddy merged PR #16513:
URL: https://github.com/apache/kafka/pull/16513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]

2024-07-03 Thread via GitHub


soarez commented on PR #16287:
URL: https://github.com/apache/kafka/pull/16287#issuecomment-2206328635

   @mimaison I suspect some of the difficulty comes from the reason for this 
PR: the current script is difficult to reason about.
   If you think it would help with the review, I can look into adding unit 
tests for some of new modules here. The downside is that the diff will be even 
larger. I'm also not sure how we'd plug them into CI, but you could run them 
locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301020


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   Yeah, I agree we should not have test cases which are not real cases. I 
change `spy` back to `mock` and only test `subscriptionState#unsubscribe` is 
called when `isNotInGroup` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-03 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1664301811


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() {
 verify(backgroundEventReaper).reap(time.milliseconds());
 }
 
+@Test
+public void testUnsubscribeWithoutGroupId() {
+consumer = newConsumerWithoutGroupId();
+completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   Yeah, removed unused mock. Thanks for the review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


C0urante commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664298524


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   I still agree with @mimaison that we shouldn't have this property in the 
`MirrorConnectorConfig` class, since then it'll be included as a "common 
config" for all connectors in our generated docs 
[here](https://kafka.apache.org/37/documentation.html#mirrormakercommonconfigs) 
instead of in the `MirrorSourceConnector`-specific docs 
[here](https://kafka.apache.org/37/documentation.html#mirrormakersourceconfigs).
   
   Can we just add a new `ConfigValue` for the property in 
`MirrorCheckpointConnector::validate` if we find a validation error with it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]

2024-07-03 Thread via GitHub


apoorvmittal10 commented on PR #16513:
URL: https://github.com/apache/kafka/pull/16513#issuecomment-2206307430

   @AndrewJSchofield @omkreddy The build passed with unrelated tests failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-03 Thread via GitHub


C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1664292432


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -363,7 +364,17 @@ public DistributedHerder(DistributedConfig config,
 
 @Override
 public void start() {
-this.herderExecutor.submit(this);
+Future future = this.herderExecutor.submit(this);
+try {
+future.get(6, TimeUnit.SECONDS);
+} catch (TimeoutException timeoutException) {
+log.error("herder work thread timeout:", timeoutException);
+future.cancel(true);
+} catch (InterruptedException interruptedException) {
+Thread.currentThread().interrupt();
+} catch (ExecutionException executionException) {
+log.error("herder work thread execution exception:", 
executionException);

Review Comment:
   We don't want to change `start` to block. Can we save the future in a field 
in this class, then expose something like a `Future herderTask()` method 
that allows testing code to access that future and, if desired, block on it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16991) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2024-07-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-16991.
-
Resolution: Fixed

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-16991
> URL: https://issues.apache.org/jira/browse/KAFKA-16991
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.9.0
>
> Attachments: 
> 5owo5xbyzjnao-org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest-shouldRestoreState()-1-output.txt
>
>
> We see this test running into timeouts more frequently recently.
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms. ==> expected:  but was: at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)•••at
>  
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)at
>  
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)at
>  org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:220)
>  {code}
> There was no ERROR or WARN log...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16991) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2024-07-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-16991:

Fix Version/s: 3.9.0

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-16991
> URL: https://issues.apache.org/jira/browse/KAFKA-16991
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.9.0
>
> Attachments: 
> 5owo5xbyzjnao-org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest-shouldRestoreState()-1-output.txt
>
>
> We see this test running into timeouts more frequently recently.
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms. ==> expected:  but was: at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)•••at
>  
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)at
>  
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)at
>  org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:220)
>  {code}
> There was no ERROR or WARN log...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15917) Flaky test - OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks

2024-07-03 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862808#comment-17862808
 ] 

Chris Egerton commented on KAFKA-15917:
---

Should be partially addressed by [https://github.com/apache/kafka/pull/15302,] 
but there will likely still be failures for this test and others in the 
{{OffsetsApiIntegrationTest}} suite that require follow-up changes.

> Flaky test - 
> OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
> ---
>
> Key: KAFKA-15917
> URL: https://issues.apache.org/jira/browse/KAFKA-15917
> Project: Kafka
>  Issue Type: Bug
>Reporter: Haruki Okada
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
> Attachments: stdout.log
>
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/]
>  
>  
> {code:java}
> Error
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> Stacktrace
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:431)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
> at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
> at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
> 

Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-03 Thread via GitHub


C0urante commented on PR #15302:
URL: https://github.com/apache/kafka/pull/15302#issuecomment-2206277714

   Thanks Yash!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-03 Thread via GitHub


C0urante merged PR #15302:
URL: https://github.com/apache/kafka/pull/15302


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


adixitconfluent commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664266986


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -4004,6 +4488,99 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(
+  shareFetchRequest : 
ShareFetchRequest,
+  topicNames : util.Map[Uuid, 
String],
+  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+shareFetchRequest.data().topics().forEach ( topic => {
+
+  if(!topicNames.asScala.contains(topic.topicId)) {
+topic.partitions.forEach((partition: 
ShareFetchRequestData.FetchPartition) => {
+  val topicIdPartition = new TopicIdPartition(
+topic.topicId,
+new TopicPartition(null, partition.partitionIndex))
+  erroneous +=
+topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+})
+  }
+  else {
+topic.partitions().forEach ( partition => {
+  val topicIdPartition = new TopicIdPartition(
+topic.topicId(),
+new TopicPartition(topicNames.get(topic.topicId()), 
partition.partitionIndex())
+  )
+  var exceptionThrown = false
+  val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+  breakable{
+partition.acknowledgementBatches().forEach( batch => {
+  try {
+acknowledgeBatches.add(new ShareAcknowledgementBatch(
+  batch.firstOffset(),
+  batch.lastOffset(),
+  batch.acknowledgeTypes()
+))
+  } catch {
+case e : IllegalArgumentException =>
+  exceptionThrown = true
+  erroneous += topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.forException(e))
+  break
+  }
+})
+  }
+  if(!exceptionThrown && acknowledgeBatches.size() > 0) {
+acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+  }
+})
+  }
+})
+acknowledgeBatchesMap
+  }
+
+  def validateAcknowledgementBatches(
+  acknowledgementDataFromRequest : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
+  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
+) : mutable.Set[TopicIdPartition] = {
+val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = 
mutable.Set.empty[TopicIdPartition]
+acknowledgementDataFromRequest.foreach{ case (tp : TopicIdPartition, 
acknowledgeBatches : util.List[ShareAcknowledgementBatch]) =>
+  var prevEndOffset = -1L
+  breakable {
+acknowledgeBatches.forEach(batch => {
+  if (batch.firstOffset > batch.lastOffset) {

Review Comment:
   can we not club all of this "if" and the below "if" conditions into a single 
if?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3955,11 +3960,490 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+  case 

[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss

2024-07-03 Thread Guillaume Mallet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862802#comment-17862802
 ] 

Guillaume Mallet commented on KAFKA-17062:
--

[~showuon] thanks for the feedback !

 

> Could you help me understand why moving the failed segment to delete_started 
>state will help reducing this issue? Because it must be cleaned next time when 
>we enter 
>`[{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]`?

That's what I had in mind but that would address only issues where writes are 
failing and not deletions (e.g. no more available space)

> But the upload failed segments should not get deleted, right?

This portion seems a bit unclear to me. Because each iteration of the RLMTask 
creates a unique 
[RemoteLogSegmentId|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L850]
 I assumed that multiple retries should result in multiple deletion as 
[deleteLogSegmentData|https://github.com/apache/kafka/blob/20e101c2e4cb2b34c2c575287cfaec76aa8c5db0/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L152]
 will be called once per unique RemoteLogSegmentId and that failed copies are 
safe to be deleted as soon as possible. 

If your understanding of the contract is that failed uploads shouldn't be 
deleted and multiple iteration of the RLMTask should reuse previous files then 
I probably misunderstood the role of the RemoteLogSegmentId because it sounds 
like we could reuse it in case of failure as we're asking for the 
[idempotency|https://github.com/apache/kafka/blob/35baa0ac4fcb7f21bb0df037d0756429db5d3bb2/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L85]
 of a copy. 

I was under the impression that each upload should be unique due to this 
RemoteLogSegmentId which is why I was steered in that direction.

 

> This is indeed a problem. Could you help open a separate Jira for this issue? 
> And welcome to contribute to it. I think we don't have to do it 
> complicatedly. Just a simple counter should be enough given this is a rare 
> case and won't cause too much problem. WDYT?

Good suggestion, I'll file one asap, that would probably be better for treating 
those tasks independently. I'm not sure I understand what you mean by a simple 
counter but maybe this discussion should happen in that new Jira. I'll ping you 
when I have it filed to make sure I understand what you mean.

> RemoteLogManager - RemoteStorageException causes data loss
> --
>
> Key: KAFKA-17062
> URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.8.0, 3.7.1, 3.9.0
>Reporter: Guillaume Mallet
>Assignee: Guillaume Mallet
>Priority: Major
>  Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error 

Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-03 Thread via GitHub


bbejeck commented on PR #16503:
URL: https://github.com/apache/kafka/pull/16503#issuecomment-2206182951

   merged #16503 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-03 Thread via GitHub


bbejeck merged PR #16503:
URL: https://github.com/apache/kafka/pull/16503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]

2024-07-03 Thread via GitHub


bachmanity1 commented on code in PR #15475:
URL: https://github.com/apache/kafka/pull/15475#discussion_r1664248591


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java:
##
@@ -174,33 +176,39 @@ public void testParseAccessTokenInvalidJson() {
 }
 
 @Test
-public void testFormatAuthorizationHeader() {
-assertAuthorizationHeader("id", "secret");
+public void testFormatAuthorizationHeader() throws 
UnsupportedEncodingException {
+assertAuthorizationHeader("id", "secret", false);
 }
 
 @Test
-public void testFormatAuthorizationHeaderEncoding() {
+public void testFormatAuthorizationHeaderEncoding() throws 
UnsupportedEncodingException {
 // See KAFKA-14496
-assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E");
+assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false);
+// See KAFKA-16345

Review Comment:
   I updated both of the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]

2024-07-03 Thread via GitHub


bachmanity1 commented on code in PR #15475:
URL: https://github.com/apache/kafka/pull/15475#discussion_r1664244790


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java:
##
@@ -174,33 +176,39 @@ public void testParseAccessTokenInvalidJson() {
 }
 
 @Test
-public void testFormatAuthorizationHeader() {
-assertAuthorizationHeader("id", "secret");
+public void testFormatAuthorizationHeader() throws 
UnsupportedEncodingException {
+assertAuthorizationHeader("id", "secret", false);
 }
 
 @Test
-public void testFormatAuthorizationHeaderEncoding() {
+public void testFormatAuthorizationHeaderEncoding() throws 
UnsupportedEncodingException {
 // See KAFKA-14496
-assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E");
+assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false);
+// See KAFKA-16345
+assertAuthorizationHeader("user!@~'", "secret-(*)!", true);
 }
 
-private void assertAuthorizationHeader(String clientId, String 
clientSecret) {
+private void assertAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode) throws UnsupportedEncodingException {

Review Comment:
   I think the intent here is to prevent unintentional modifications to the 
`HttpAccessTokenRetriever.formatAuthorizationHeader()` method that could change 
its resulting value. 
   
   cc. @kirktrue 



##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -192,6 +192,10 @@ public class SaslConfigs {
 + " be inspected for the standard OAuth \"iss\" claim and if this 
value is set, the broker will match it exactly against what is in the JWT's 
\"iss\" claim. If there is no"
 + " match, the broker will reject the JWT and authentication will 
fail.";
 
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE = 
"sasl.oauthbearer.header.urlencode";
+public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE = 
false;
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_DOC = "The 
(optional) setting to enable the OAuth client to URL-encode the client_id and 
client_secret in the authorization header"
++ " in accordance with RFC6749, see 
https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1 for more detail. 
The default value is set to 'false' for backward compatibility";

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-03 Thread via GitHub


bbejeck commented on PR #16503:
URL: https://github.com/apache/kafka/pull/16503#issuecomment-2206142493

   Failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records

2024-07-03 Thread dujian0068 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dujian0068 reassigned KAFKA-17070:
--

Assignee: dujian0068

> perf: consider to use ByteBufferOutputstream to append records
> --
>
> Key: KAFKA-17070
> URL: https://issues.apache.org/jira/browse/KAFKA-17070
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: dujian0068
>Priority: Major
>
> Consider to use ByteBufferOutputstream to append records, instead of a 
> DataOutputStream. We should add JMH test to confirm this indeed improve the 
> performance before merging it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


adixitconfluent commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664193628


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3955,11 +3960,490 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+  case Some(manager) => manager
+  case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+}
+
+val groupId = shareFetchRequest.data.groupId
+val memberId = shareFetchRequest.data.memberId
+val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+var cachedTopicPartitions : util.List[TopicIdPartition] = null
+
+if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+  try {
+cachedTopicPartitions = 
sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))
+  } catch {
+// Exception handling is needed when this value is being utilized on 
receiving FINAL_EPOCH.
+case _: ShareSessionNotFoundException => cachedTopicPartitions = null
+  }
+}
+
+def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+  var isAcknowledgeDataPresent = false
+  shareFetchRequest.data.topics.forEach ( topic => {
+breakable{
+  topic.partitions.forEach ( partition => {
+if (partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty) {
+  isAcknowledgeDataPresent = true
+  break
+} else {
+  isAcknowledgeDataPresent = false
+}
+  })
+}
+  })
+  isAcknowledgeDataPresent
+}
+
+val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+
+def isInvalidShareFetchRequest() : Boolean = {
+  // The Initial Share Fetch Request should not Acknowledge any data
+  if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+return true
+  }
+  false
+}
+
+val topicNames = metadataCache.topicIdsToNames()
+val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+
+val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+var shareFetchContext : ShareFetchContext = null
+
+var shareFetchResponse : ShareFetchResponse = null
+
+// check if the Request is Invalid
+if(isInvalidShareFetchRequest()) {
+  shareFetchResponse = 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception) match {
+case response: ShareFetchResponse => response
+case _ => null
+  }
+}
+
+try {
+  // Creating the shareFetchContext for Share Session Handling
+  shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+} catch {
+  case e: Exception => shareFetchResponse = 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) 
match {
+case response: ShareFetchResponse => response
+case _ => null
+  }
+}
+
+// Variable to store any error thrown while the handling piggybacked 
acknowledgements
+var acknowledgeError : Errors = Errors.NONE
+// Variable to store the topic partition wise result of piggybacked 
acknowledgements
+var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+// This check is done to make sure that there was no Share Session related 
error while creating shareFetchContext
+if(shareFetchResponse == null) {

Review 

[jira] [Created] (KAFKA-17072) Document broker decommissioning process with KRaft

2024-07-03 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-17072:
--

 Summary: Document broker decommissioning process with KRaft
 Key: KAFKA-17072
 URL: https://issues.apache.org/jira/browse/KAFKA-17072
 Project: Kafka
  Issue Type: Improvement
  Components: docs
Reporter: Mickael Maison


When decommissioning a broker in KRaft mode, the broker also has to be 
explicitly unregistered. This is not mentioned anywhere in the documentation.

A broker not unregistered stays eligible for new partition assignment and will 
prevent bumping the metadata version if the remaining brokers are upgraded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


adixitconfluent commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664178535


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val topicNames = metadataCache.topicIdsToNames()
+val sharePartitionManager : SharePartitionManager = 
sharePartitionManagerOption match {
+  case Some(manager) => manager
+  case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+}
+
+val groupId = shareFetchRequest.data.groupId
+val clientId = request.header.clientId
+val memberId = shareFetchRequest.data().memberId()
+val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch()
+
+val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+var cachedTopicPartitions : util.List[TopicIdPartition] = null
+
+if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+  try {
+cachedTopicPartitions = 
sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))
+  } catch {
+// Exception handling is needed when this value is being utilized on 
receiving FINAL_EPOCH.
+case _: ShareSessionNotFoundException => cachedTopicPartitions = null
+  }
+}
+
+def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+  var isAcknowledgeDataPresent = false
+  shareFetchRequest.data().topics().forEach ( topic => {
+breakable{
+  topic.partitions().forEach ( partition => {
+if (partition.acknowledgementBatches() != null && 
!partition.acknowledgementBatches().isEmpty) {
+  isAcknowledgeDataPresent = true
+  break()
+} else {
+  isAcknowledgeDataPresent = false
+}
+  })
+}
+  })
+  isAcknowledgeDataPresent
+}
+
+val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+var shareFetchResponse : ShareFetchResponse = null
+// Variable to store any error thrown while the handling piggybacked 
acknowledgements
+var acknowledgeError : Errors = Errors.NONE
+// Variable to store the topic partition wise result of piggybacked 
acknowledgements
+var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+def isInvalidShareFetchRequest() : Boolean = {
+  // The Initial Share Fetch Request should not Acknowledge any data
+  if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+return true
+  }
+  false
+}
+
+val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+var shareFetchContext : ShareFetchContext = null
+try {
+  // Creating the shareFetchContext for Share Session Handling
+  shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+} catch {
+  case e: Exception => shareFetchResponse = 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) 
match {
+case response: ShareFetchResponse => response
+case _ => null

Review Comment:
   I agree with @apoorvmittal10 , seeing the newContext function, the possible 
errors are `INVALID_REQUEST`, `SHARE_SESSION_NOT_FOUND` and 
`INVALID_SHARE_SESSION_EPOCH`. In all such cases, we should be completing the 
API call with a top level there itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


adixitconfluent commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664178535


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val topicNames = metadataCache.topicIdsToNames()
+val sharePartitionManager : SharePartitionManager = 
sharePartitionManagerOption match {
+  case Some(manager) => manager
+  case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+}
+
+val groupId = shareFetchRequest.data.groupId
+val clientId = request.header.clientId
+val memberId = shareFetchRequest.data().memberId()
+val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch()
+
+val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+var cachedTopicPartitions : util.List[TopicIdPartition] = null
+
+if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+  try {
+cachedTopicPartitions = 
sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))
+  } catch {
+// Exception handling is needed when this value is being utilized on 
receiving FINAL_EPOCH.
+case _: ShareSessionNotFoundException => cachedTopicPartitions = null
+  }
+}
+
+def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+  var isAcknowledgeDataPresent = false
+  shareFetchRequest.data().topics().forEach ( topic => {
+breakable{
+  topic.partitions().forEach ( partition => {
+if (partition.acknowledgementBatches() != null && 
!partition.acknowledgementBatches().isEmpty) {
+  isAcknowledgeDataPresent = true
+  break()
+} else {
+  isAcknowledgeDataPresent = false
+}
+  })
+}
+  })
+  isAcknowledgeDataPresent
+}
+
+val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+var shareFetchResponse : ShareFetchResponse = null
+// Variable to store any error thrown while the handling piggybacked 
acknowledgements
+var acknowledgeError : Errors = Errors.NONE
+// Variable to store the topic partition wise result of piggybacked 
acknowledgements
+var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+def isInvalidShareFetchRequest() : Boolean = {
+  // The Initial Share Fetch Request should not Acknowledge any data
+  if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+return true
+  }
+  false
+}
+
+val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+var shareFetchContext : ShareFetchContext = null
+try {
+  // Creating the shareFetchContext for Share Session Handling
+  shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+} catch {
+  case e: Exception => shareFetchResponse = 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e) 
match {
+case response: ShareFetchResponse => response
+case _ => null

Review Comment:
   I agree with @apoorvmittal10 , seeing the newContext function, the possible 
errors are `INVALID_REQUEST`, `SHARE_SESSION_NOT_FOUND` and 
`INVALID_SHARE_SESSION_EPOCH`. In all such cases, we should be completing the 
API call with a top level error code there itself.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  

[jira] [Commented] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records

2024-07-03 Thread Luca Molteni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862782#comment-17862782
 ] 

Luca Molteni commented on KAFKA-17070:
--

[~bmilk] sure go on

> perf: consider to use ByteBufferOutputstream to append records
> --
>
> Key: KAFKA-17070
> URL: https://issues.apache.org/jira/browse/KAFKA-17070
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>
> Consider to use ByteBufferOutputstream to append records, instead of a 
> DataOutputStream. We should add JMH test to confirm this indeed improve the 
> performance before merging it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17070) perf: consider to use ByteBufferOutputstream to append records

2024-07-03 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862773#comment-17862773
 ] 

dujian0068 commented on KAFKA-17070:


Hello:

This is a valuable question,Can I try it?

> perf: consider to use ByteBufferOutputstream to append records
> --
>
> Key: KAFKA-17070
> URL: https://issues.apache.org/jira/browse/KAFKA-17070
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>
> Consider to use ByteBufferOutputstream to append records, instead of a 
> DataOutputStream. We should add JMH test to confirm this indeed improve the 
> performance before merging it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17066:
--
Component/s: clients

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664144313


##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -427,6 +427,26 @@ public List 
cachedTopicIdPartitionsInShareSession(String group
 return cachedTopicIdPartitions;
 }
 
+/**
+ * The acknowledgeShareSessionCacheUpdate method is used to update the 
share session cache before acknowledgements are handled
+ * either as part of shareFetch request or shareAcknowledge request
+ * @param groupId The group id in the request.
+ * @param memberId The member id of the client in the request.
+ * @param reqEpoch The request epoch.
+ */
+public void acknowledgeShareSessionCacheUpdate(String groupId, Uuid 
memberId, int reqEpoch) {

Review Comment:
   nope, already taken care of in the latest commit. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-03 Thread via GitHub


frankvicky commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2206000614

   Hi @showuon 
   I have simplified the description  based on feedback, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-03 Thread via GitHub


apoorvmittal10 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1664122712


##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -427,6 +427,26 @@ public List 
cachedTopicIdPartitionsInShareSession(String group
 return cachedTopicIdPartitions;
 }
 
+/**
+ * The acknowledgeShareSessionCacheUpdate method is used to update the 
share session cache before acknowledgements are handled
+ * either as part of shareFetch request or shareAcknowledge request
+ * @param groupId The group id in the request.
+ * @param memberId The member id of the client in the request.
+ * @param reqEpoch The request epoch.
+ */
+public void acknowledgeShareSessionCacheUpdate(String groupId, Uuid 
memberId, int reqEpoch) {

Review Comment:
   Is this method being used anywhere now?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val topicNames = metadataCache.topicIdsToNames()
+val sharePartitionManager : SharePartitionManager = 
sharePartitionManagerOption match {
+  case Some(manager) => manager
+  case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+}
+
+val groupId = shareFetchRequest.data.groupId
+val clientId = request.header.clientId
+val memberId = shareFetchRequest.data().memberId()
+val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch()
+
+val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+var cachedTopicPartitions : util.List[TopicIdPartition] = null
+
+if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+  try {
+cachedTopicPartitions = 
sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))
+  } catch {
+// Exception handling is needed when this value is being utilized on 
receiving FINAL_EPOCH.
+case _: ShareSessionNotFoundException => cachedTopicPartitions = null
+  }
+}
+
+def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+  var isAcknowledgeDataPresent = false
+  shareFetchRequest.data().topics().forEach ( topic => {
+breakable{
+  topic.partitions().forEach ( partition => {
+if (partition.acknowledgementBatches() != null && 
!partition.acknowledgementBatches().isEmpty) {
+  isAcknowledgeDataPresent = true
+  break()
+} else {
+  isAcknowledgeDataPresent = false
+}
+  })
+}
+  })
+  isAcknowledgeDataPresent
+}
+
+val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+var shareFetchResponse : ShareFetchResponse = null
+// Variable to store any error thrown while the handling piggybacked 
acknowledgements
+var acknowledgeError : Errors = Errors.NONE
+// Variable to store the topic partition wise result of piggybacked 
acknowledgements
+var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+def isInvalidShareFetchRequest() : Boolean = {
+  // The Initial Share Fetch Request should not Acknowledge any data
+  if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+return true
+  }
+  false
+}
+
+val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+var shareFetchContext : ShareFetchContext = null
+try {
+  // Creating the shareFetchContext for Share Session Handling
+  shareFetchContext = 

Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664131813


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))
+.forEach(conf -> conf.errorMessages().add(errorMsg)));

Review Comment:
   updated this. Forgot to roll it back to addErrorMessage when addressing some 
of other feedbacks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664131206


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   I moved this to MirrorConnectorConfig which all inherit. So this should be 
addressed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-03 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1664129210


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -234,6 +234,30 @@ public ConfigDef config() {
 @Override
 public org.apache.kafka.common.config.Config validate(Map 
props) {
 List configValues = super.validate(props).configValues();
+validateExactlyOnceConfigs(props, configValues);
+validateEmitOffsetSyncConfigs(props, configValues);
+
+return new org.apache.kafka.common.config.Config(configValues);
+}
+
+private static void validateEmitOffsetSyncConfigs(Map 
props, List configValues) {
+boolean offsetSyncsConfigured = props.keySet().stream()
+.anyMatch(conf -> 
conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || 
conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
+
+if 
("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && 
offsetSyncsConfigured) {

Review Comment:
   No, because `offsetSyncsConfigured` has default which is the global configs 
for topics and clients 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API

2024-07-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-17041:
--
Issue Type: Improvement  (was: Task)

> Add pagination when describe large set of metadata via Admin API 
> -
>
> Key: KAFKA-17041
> URL: https://issues.apache.org/jira/browse/KAFKA-17041
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Major
>
> Some of the request via Admin API timeout on large cluster or cluster with 
> large set of specific metadata. For example OffsetFetchRequest and 
> DescribeLogDirsRequest timeout due to large number of partition on cluster. 
> Also DescribeProducersRequest and ListTransactionsRequest time out due to too 
> many short lived PID or too many hanging transactions
> [KIP-1062: Introduce Pagination for some requests used by Admin 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >