[PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


FrankYang0529 opened a new pull request, #15802:
URL: https://github.com/apache/kafka/pull/15802

   Use try-with-resource to create producer and make sure the producer send 
record without error.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16618) Update the RPC for ConsumerGroupHeartbeatRequest and ConsumerGroupHeartbeatResponse

2024-04-25 Thread Phuc Hong Tran (Jira)
Phuc Hong Tran created KAFKA-16618:
--

 Summary: Update the RPC for ConsumerGroupHeartbeatRequest and 
ConsumerGroupHeartbeatResponse
 Key: KAFKA-16618
 URL: https://issues.apache.org/jira/browse/KAFKA-16618
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Reporter: Phuc Hong Tran
Assignee: Phuc Hong Tran
 Fix For: 4.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16582) Feature Request: Introduce max.record.size Configuration Parameter for Producers

2024-04-25 Thread dujian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840684#comment-17840684
 ] 

dujian commented on KAFKA-16582:


hello [~ramiz.mehran] 

I have reproduced this problem and found that the delay will further increase 
as more messages are sent.

But I haven't found the reason why the problem occurs

> Feature Request: Introduce max.record.size Configuration Parameter for 
> Producers
> 
>
> Key: KAFKA-16582
> URL: https://issues.apache.org/jira/browse/KAFKA-16582
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Affects Versions: 3.6.2
>Reporter: Ramiz Mehran
>Priority: Major
>
> {*}Summary{*}:
> Currently, Kafka producers have a {{max.request.size}} configuration that 
> limits the size of the request sent to Kafka brokers, which includes both 
> compressed and uncompressed data sizes. However, it is also the maximum size 
> of an individual record before it is compressed. This can lead to 
> inefficiencies and unexpected behaviours, particularly when records are 
> significantly large before compression but fit multiple times into the 
> {{max.request.size}} after compression.
> {*}Problem{*}:
> During spikes in data transmission, especially with large records, even when 
> compressed within the limits of {{{}max.request.size{}}}, it causes an 
> increase in latency and potential backlog in processing due to the large 
> batch sizes formed by compressed records. This problem is particularly 
> pronounced when using highly efficient compression algorithms like zstd, 
> where the compressed size may allow for large batches that are inefficient to 
> process.
> {*}Proposed Solution{*}:
> Introduce a new producer configuration parameter: {{{}max.record.size{}}}. 
> This parameter will allow administrators to define the maximum size of a 
> record before it is compressed. This would help in managing expectations and 
> system behavior more predictably by separating uncompressed record size limit 
> from compressed request size limit.
> {*}Benefits{*}:
>  # {*}Predictability{*}: Producers can reject records that exceed the 
> {{max.record.size}} before spending resources on compression.
>  # {*}Efficiency{*}: Helps in maintaining efficient batch sizes and system 
> throughput, especially under high load conditions.
>  # {*}System Stability{*}: Avoids the potential for large batch processing 
> which can affect latency and throughput negatively.
> {*}Example{*}: Consider a scenario where the producer sends records up to 20 
> MB in size which, when compressed, fit into a batch under the 25 MB 
> {{max.request.size }}multiple times. These batches can be problematic to 
> process efficiently, even though they meet the current maximum request size 
> constraints. With {{{}max.record.size{}}}, we could separate max.request.size 
> to only limit compressed request size creation, thus helping us limit that to 
> say 5 MB. Thus, preventing very large requests being made, and causing 
> latency spikes.
> {*}Steps to Reproduce{*}:
>  # Configure a Kafka producer with {{max.request.size}} set to 25 MB.
>  # Send multiple uncompressed records close to 20 MB that compress to less 
> than 25 MB.
>  # Observe the impact on Kafka broker performance and client side latency.
> {*}Expected Behavior{*}: The producer should allow administrators to set both 
> pre-compression record size limits and total request size limits post 
> compression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-25 Thread dujian (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dujian reassigned KAFKA-16584:
--

Assignee: dujian

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-25 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1579057875


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   @showuon , actually I typed in the wrong message yesterday. I ran the tests 
multiple times and it passes as expected. The reason I tried doing the other 
approach was that in the other 
[comment](https://github.com/apache/kafka/pull/15305#discussion_r1570226078) 
you had mentioned that it is not normal for consumers to not send heartbeats 
(other than readability). So to be closer to the real case, I had kept it.
   
   Nonetheless I have reverted the code based on your suggestion and it works 
as well. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-25 Thread via GitHub


mimaison commented on PR #15786:
URL: https://github.com/apache/kafka/pull/15786#issuecomment-2076660780

   Just rebased, let's run another build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1579102085


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {

Review Comment:
   nit: Should we call it `SubscriptionType`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -90,17 +91,29 @@ private Map> membersPerTopic(final 
AssignmentSpec assignmentS
 Map> membersPerTopic = new HashMap<>();
 Map membersData = 
assignmentSpec.members();
 
-membersData.forEach((memberId, memberMetadata) -> {
-Collection topics = memberMetadata.subscribedTopicIds();
+if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) {
+List allMembers = new ArrayList<>(membersData.keySet());

Review Comment:
   I wonder if we could change the return type of the method from `Map>` to `Map>` and avoid this copy here. It 
seems possible because we only iterate over the member ids later on. This could 
be a nice performance improvement too while we are here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {
+HOMOGENEOUS("Homogeneous"),
+HETEROGENEOUS("Heterogeneous");
+private final String name;

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -620,9 +637,9 @@ public Map 
computeSubscriptionMetadata(
 TopicsImage topicsImage,
 ClusterImage clusterImage
 ) {
-// Copy and update the current subscriptions.
+// Copy and update the current subscription information.
 Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
-maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, 
oldMember, newMember);

Review Comment:
   Hum... We need to be careful here because  we are not suppose to update the 
internal state of the group on this code path. It may be better to keep it as 
it was before and to have another method to update the subscription type.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. 

[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-04-25 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840715#comment-17840715
 ] 

Omnia Ibrahim commented on KAFKA-16212:
---

I believe that https://issues.apache.org/jira/browse/KAFKA-10551 (and possibly 
KAFKA-10549) needs to be done first as  ProoduceRequest and Respond interact 
with replicaManager cache to append log and it would be easier if this produce 
request path is already topic ID aware before updating the cache. I am 
prioritising KAFKA-10551 

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16609: Update parse_describe_topic to support new topic describe output [kafka]

2024-04-25 Thread via GitHub


lucasbru merged PR #15799:
URL: https://github.com/apache/kafka/pull/15799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-25 Thread via GitHub


lucasbru commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2076731964

   Still looking good to me. I was waiting for the comments from philipp to be 
commented on / addressed. Do you want to skip ahead an merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-25 Thread via GitHub


nizhikov commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2076735612

   Hello @chia7712 
   
   1. Reworked test to use ClusterTestExtensions
   2. Extend `testExitWithNonZeroStatusOnUpdatingUnallowedConfig` to check in 
kraft mode.
   3. Other two test zk specific. It seems we can leave them as is.
   
   Can you, please, take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15797:
URL: https://github.com/apache/kafka/pull/15797#issuecomment-2076741501

   @OmniaGM Could you please rebase code to include fix 
(https://github.com/apache/kafka/pull/15801)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2076742667

   > Can you, please, take a look?
   
   sure! will take a look later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]

2024-04-25 Thread via GitHub


OmniaGM commented on PR #15797:
URL: https://github.com/apache/kafka/pull/15797#issuecomment-2076745737

   > @OmniaGM Could you please rebase code to include fix (#15801)?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-25 Thread via GitHub


showuon commented on PR #15732:
URL: https://github.com/apache/kafka/pull/15732#issuecomment-2076748016

   @akhileshchg @mumrah @cmccabe , we need your comment on this. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16563) migration to KRaft hanging after MigrationClientException

2024-04-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840723#comment-17840723
 ] 

Luke Chen commented on KAFKA-16563:
---

[~davidarthur] [~akhileshchg] , could you take a look at this issue. Thanks.

> migration to KRaft hanging after MigrationClientException
> -
>
> Key: KAFKA-16563
> URL: https://issues.apache.org/jira/browse/KAFKA-16563
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When running ZK migrating to KRaft process, we encountered an issue that the 
> migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
> state. After investigation, the root cause is because the pollEvent didn't 
> retry with the retriable `MigrationClientException` (i.e. ZK client retriable 
> errors) while it should. And because of this, the poll event will not poll 
> anymore, which causes the KRaftMigrationDriver cannot work as expected.
>  
> {code:java}
> 2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered 
> ZooKeeper error during event PollEvent. Will retry. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists for /migrationat 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
> kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
> kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)  
>   at 
> kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)
> at 
> kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
> at 
> kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
> at java.base/java.lang.Thread.run(Thread.java:840){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15802:
URL: https://github.com/apache/kafka/pull/15802#issuecomment-2076758475

   @FrankYang0529 We reuse the TOPIC and GROUP in the for-loop, and that could 
be a issue if the cleanup of topic/group is not finished before running next 
loop. Could you please use different topic/group for each loop?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2076761562

   @TaiJuWu could you please rebase code to include 
https://github.com/apache/kafka/pull/15801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579187392


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");

Review Comment:
   Don't you need a verification here that ensures that the heartbeat timer was 
reset after the poll? 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time did not change 
after the progress.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -469,19 +469,33 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+/**
+ * Check if a heartbeat request should be sent on the current time. A 
heartbeat should be
+ * sent if the heartbeat timer has expired, backoff has expired, and 
there is no request
+ * in-flight.
+ */
 @Override
 public boolean canSendRequest(final long currentTimeMs) {
 update(currentTimeMs);
 return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
 }
 
-public long nextHeartbeatMs(final long currentTimeMs) {
+public long timeToNextHeartbeatMs(final long currentTimeMs) {
 if (heartbeatTimer.remainingMs() == 0) {

Review Comment:
   Sorry if I comment on code outside the PR. Isn't this the same as 
`heartbeatTimer.isExpired()`? If yes, could we please change this to make the 
the code more readable?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time to the next 
heartbeat did not change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time did not change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the timer reset after the poll, progress the time a bit (less 
then the heartbeat interval), and then verify here that the time to the next 
heartbeat is the heartbeat interval minus the amount of time I progressed the 
time after the poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579212500


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+Optional localStorage = 
context.localStorages().stream().filter(storage -> 
storage.getBrokerId().intValue() == brokerId).findFirst();
+if (!localStorage.isPresent()) {
+throw new IllegalArgumentException("cannot find local storage for 
this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+Optional sourceDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+Optional targetDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+if (!sourceDir.isPresent()) {
+throw new IllegalArgumentException("No log dir with topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+if (!targetDir.isPresent()) {
+throw new IllegalArgumentException("No log dir without topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+// build alterReplicaLogDirs request content to move from sourceDir to 
targetDir
+Map logDirs = 
Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath());
+
+context.admin().alterReplicaLogDirs(logDirs);
+
+// wait until the topic partition folder disappearing from source dir 
and appearing in the target dir
+TestUtils.waitForCondition(() -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
targetDir.get()) &&
+
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
sourceDir.get()),
+"Failed to alter dir:" + logDirs);
+}
+
+@Override
+public void describe(PrintStream output) {
+output.print("alter di for topic partition:" + topicPartition + " in 
this broker id:" + brokerId);

Review Comment:
   Ah, nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15645:
URL: https://github.com/apache/kafka/pull/15645#discussion_r1579206989


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.AutoStart;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.test.junit.ZkClusterInvocationContext;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class ConfigCommandIntegrationTest {
+AdminZkClient adminZkClient;
+List alterOpts;
+
+private final ClusterInstance cluster;
+
+public ConfigCommandIntegrationTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),
+@ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),

Review Comment:
   not sure why we need to set 2 brokers since this test case seems to check 
the disallowed args 



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.AutoStart;

Review Comment:
   this is unused



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *

Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-25 Thread via GitHub


TaiJuWu commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2076894379

   > @TaiJuWu could you please rebase code to include #15801
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable test, check if continues failing in CI [kafka]

2024-04-25 Thread via GitHub


TaiJuWu commented on PR #13953:
URL: https://github.com/apache/kafka/pull/13953#issuecomment-2076900770

   Hello @Kiriakos1998 ,
   Do you still work on it?
   If not, I want to take it over.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579325677


##
core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala:
##
@@ -153,12 +157,13 @@ class ReplicaFetcherTierStateMachineTest {
 assertEquals(11L, replicaState.logEndOffset)
   }
 
-  @Test
-  def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+  @ParameterizedTest
+  @ArgumentsSource(classOf[TierStateMachineTest.Params])
+  def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean, 
useFutureLog: Boolean): Unit = {
 val partition = new TopicPartition("topic", 0)
 var isErrorHandled = false
 val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = 
truncateOnFetch, version = version)
-val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint, 
useFutureLog) {

Review Comment:
   Unfortunately, you're correct! Removed the `useFutureLog`. I'll think about 
how to do unit test for it. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-25 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2076983151

   Thanks Chris! I ran through the scenarios in the test and I can see that it 
handles the cases correctly. Regarding, `cancel` I don't see the future 
returned from `set` being cancelled explicitly so we can live w/o 
implementations of `cancel` and `isCancelled`. 
   Also, I have now updated the tests so that I control when should a record be 
returned, throw and error or a timeout. `MockProducer` provided some great 
hooks to do the same. I added a couple of more tests which even test the 
timeout scenario and the tests throw a timeout until all futures return 
promptly (error or o/w). Let me know how this is looking now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328199


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   I've checked and because the we've done truncation earlier, so we'll return 
fetching state. And in fetching state, the `latestEpoch` is ignore. The 
`latestEpoch` is only used when doing truncation. But I still change it to the 
correct one in case we use that in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328510


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+Optional localStorage = 
context.localStorages().stream().filter(storage -> 
storage.getBrokerId().intValue() == brokerId).findFirst();
+if (!localStorage.isPresent()) {
+throw new IllegalArgumentException("cannot find local storage for 
this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+Optional sourceDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+Optional targetDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+if (!sourceDir.isPresent()) {
+throw new IllegalArgumentException("No log dir with topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+if (!targetDir.isPresent()) {
+throw new IllegalArgumentException("No log dir without topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+// build alterReplicaLogDirs request content to move from sourceDir to 
targetDir
+Map logDirs = 
Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath());
+
+context.admin().alterReplicaLogDirs(logDirs);

Review Comment:
   Nice suggestion. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2076987711

   @mimaison @soarez , PR updated. Please take a look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10733: Clean up producer exceptions [kafka]

2024-04-25 Thread via GitHub


lucasbru closed pull request #13876: KAFKA-10733: Clean up producer exceptions
URL: https://github.com/apache/kafka/pull/13876


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579221285


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) {
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
 acquireAndEnsureOpen();
 try {
-AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
+Timer timer = time.timer(Long.MAX_VALUE);
+AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, 
timer);

Review Comment:
   Why do you use a timer here? The `asyncCommit()` does not throw any timeout 
exception, does it? If you need to pass the timer to the `CommitEvent` or 
further up the class hierarchy then you can create the timer in the constructor 
of `AsyncCommitEvent` or even further up the class hierarchy. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java:
##
@@ -27,19 +30,34 @@
 public abstract class CompletableBackgroundEvent extends BackgroundEvent 
implements CompletableEvent {
 
 private final CompletableFuture future;
+private final long deadlineMs;
 
-protected CompletableBackgroundEvent(final Type type) {
+protected CompletableBackgroundEvent(final Type type, final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
+Objects.requireNonNull(timer);
+
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   Same questions as above.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java:
##
@@ -16,120 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.IdempotentCloser;
-import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.io.Closeable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * An {@link EventProcessor} is the means by which events produced by 
thread A are
- * processed by thread B. By definition, threads A 
and B run in parallel to
- * each other, so a mechanism is needed with which to receive and process the 
events from the other thread. That
- * communication channel is formed around {@link BlockingQueue a shared queue} 
into which thread A
- * enqueues events and thread B reads and processes those events.
+ * An {@code EventProcessor} is the means by which events are 
processed, the meaning of which is left
+ * intentionally loose. This is in large part to keep the {@code 
EventProcessor} focused on what it means to process
+ * the events, and not linking itself too closely with the rest of 
the surrounding application.
+ *
+ * 
+ *
+ * The {@code EventProcessor} is envisaged as a stateless service that acts as 
a conduit, receiving an event and
+ * dispatching to another block of code to process. The semantic meaning of 
each event is different, so the
+ * {@code EventProcessor} will need to interact with other parts of the system 
that maintain state. The
+ * implementation should not be concerned with the mechanism by which an event 
arrived for processing. While the
+ * events are shuffled around the consumer subsystem by means of {@link 
BlockingQueue shared queues}, it should
+ * be considered an anti-pattern to need to know how it arrived or what 
happens after its is processed.
  */
-public abstract class EventProcessor implements Closeable {
-
-private final Logger log;
-private final BlockingQueue eventQueue;
-private final IdempotentCloser closer;
-
-protected EventProcessor(final LogContext logContext, final 
BlockingQueue eventQueue) {
-this.log = logContext.logger(EventProcessor.class);
-this.eventQueue = eventQueue;
-this.closer = new IdempotentCloser();
-}
-
-public abstract boolean process();
-
-protected abstract void process(T event);
-
-@Override
-public void close() {
-closer.close(this::closeInternal, () -> log.warn("The event processor 
was already closed"));
-}
-
-protected interface ProcessHandler {
-
-void onProcess(T event, Optional error);
-}
+public interface EventProcessor extends AutoCloseable {
 
 /**
- * Drains all available events from the queue, and then processes them in 
order. If any errors are thrown while
- * processing the individual events

Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]

2024-04-25 Thread via GitHub


lucasbru commented on code in PR #15778:
URL: https://github.com/apache/kafka/pull/15778#discussion_r1579343899


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, 
static_membership, bounce_
 self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, 
num_bounces=num_bounces)
 
 num_revokes_after_bounce = consumer.num_revokes_for_alive() - 
num_revokes_before_bounce
-
-check_condition = num_revokes_after_bounce != 0
+
 # under static membership, the live consumer shall not revoke any 
current running partitions,
 # since there is no global rebalance being triggered.
 if static_membership:
-check_condition = num_revokes_after_bounce == 0
-
-assert check_condition, \
-"Total revoked count %d does not match the expectation of having 0 
revokes as %d" % \
-(num_revokes_after_bounce, check_condition)
+assert num_revokes_after_bounce == 0, \
+"Unexpected revocation triggered when bouncing static member. 
Expecting 0 but had %d revocations" % num_revokes_after_bounce
+elif consumer.is_eager():
+assert num_revokes_after_bounce != 0, \

Review Comment:
   No, lets not remove a variation.
   
   I'm just thinking about the CONSUMER protocol case. If a dynamic client can 
pass this test with `num_revokes_after_bounce = 0`, how meaningful is it really 
to check that static clients have `num_revokes_after_bounce = 0`. I could write 
a consumer that ignores the static membership configuration completely, and 
still pass this test, right?
   
   The test description writes
   ```
   In order to make
   sure the behavior of static members are different from dynamic ones, 
we take both static and dynamic
   membership into this test suite.
   ```
   
   But in the `CONSUMER` protocol, it seems the behavior isn't all that 
difference, at least if we only look at `num_revokes_after_bounce`.
   
   So maybe we should instead try to get the set of partitions and check that 
it didn't change?
   
   Let me know if I'm understanding something wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-25 Thread via GitHub


lucasbru commented on PR #15737:
URL: https://github.com/apache/kafka/pull/15737#issuecomment-2077011152

   Do I understand it correctly that there is no functional change here, just 
logging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-25 Thread via GitHub


AndrewJSchofield opened a new pull request, #15803:
URL: https://github.com/apache/kafka/pull/15803

   The contract of KafkaConsumer.poll(Duration) says that it throws 
InterruptException "if the calling thread is interrupted before or while this 
function is called". The new KafkaConsumer implementation was not doing this if 
the thread was interrupted before the poll was called, specifically with a very 
short timeout. If it ever waited for records, it did check the thread state. If 
it did not wait for records because of a short timeout, it did not.
   
   Some of the log messages in the code were erroneously mentioned timeouts, 
when they really meant interruption.
   
   Also adds a test for this specific scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1579334152


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -189,7 +189,7 @@ public RecordAccumulator(LogContext logContext,
  BufferPool bufferPool) {
 this(logContext,
 batchSize,
-compression,
+compression,

Review Comment:
   nit: wrong indent



##
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+/**
+ * The compression type for this compression codec
+ */
+CompressionType type();
+
+/**
+ * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+ * Note: Unlike {@link #wrapForInput}, this cannot take {@link 
ByteBuffer}s directly.
+ * Currently, MemoryRecordsBuilder writes to the underlying buffer in the 
given {@link ByteBufferOutputStream} after the compressed data has been written.
+ * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+ */
+OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte 
messageVersion);
+
+/**
+ * Wrap buffer with an InputStream that will decompress data with this 
CompressionType.
+ *
+ * @param buffer The {@link ByteBuffer} instance holding the data to 
decompress.
+ * @param messageVersion The record format version to use.
+ * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used 
for decompression if supported.
+ * For small record batches, allocating a potentially large buffer (64 KB 
for LZ4)

Review Comment:
   miss `@return`



##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompressi

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1579345622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   What's the reason for doing this only if the member is not new?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);

Review Comment:
   I think that we should rather do this after the request/group/member 
validations.



##
group-coordinator/src/main/java/org/apache/kafka/coordin

[PR] MINOR: Remove unnecessary version from excluded dependencies of clients [kafka]

2024-04-25 Thread via GitHub


tinaselenge opened a new pull request, #15804:
URL: https://github.com/apache/kafka/pull/15804

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16619) Unnecessary controller warning : "Loaded ZK migration state of NONE"

2024-04-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

F Méthot updated KAFKA-16619:
-
Summary: Unnecessary controller warning : "Loaded ZK migration state of 
NONE"  (was: Unnecessary warning : "Loaded ZK migration state of NONE")

> Unnecessary controller warning : "Loaded ZK migration state of NONE"
> 
>
> Key: KAFKA-16619
> URL: https://issues.apache.org/jira/browse/KAFKA-16619
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 3.6.2
>Reporter: F Méthot
>Priority: Trivial
>
> When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
> involved.
> We get this warning in the log:
> [2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController)
>  
> Our project has no business with Zookeeper, seeing this message prompted us 
> to investigate and spend time looking up this warning to find an explanation.
> We have that setting
> {_}zookeeper.metadata.migration.enable{_}=false
> and we still get that warning.
> In future version, to avoid further confusion this message should not be 
> showed when zookeeper is not involved at all .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16619) Unnecessary controller warning : "Loaded ZK migration state of NONE"

2024-04-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

F Méthot updated KAFKA-16619:
-
Description: 
When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
involved.

We get this warning in the controller log:

[2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController)

 

Our project has no business with Zookeeper, seeing this message prompted us to 
investigate and spend time looking up this warning to find an explanation.

We have that setting

{_}zookeeper.metadata.migration.enable{_}=false

and we still get that warning.

In future version, to avoid further confusion this message should not be showed 
when zookeeper is not involved at all .

  was:
When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
involved.

We get this warning in the log:

[2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController)

 

Our project has no business with Zookeeper, seeing this message prompted us to 
investigate and spend time looking up this warning to find an explanation.

We have that setting

{_}zookeeper.metadata.migration.enable{_}=false

and we still get that warning.

In future version, to avoid further confusion this message should not be showed 
when zookeeper is not involved at all .


> Unnecessary controller warning : "Loaded ZK migration state of NONE"
> 
>
> Key: KAFKA-16619
> URL: https://issues.apache.org/jira/browse/KAFKA-16619
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 3.6.2
>Reporter: F Méthot
>Priority: Trivial
>
> When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
> involved.
> We get this warning in the controller log:
> [2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController)
>  
> Our project has no business with Zookeeper, seeing this message prompted us 
> to investigate and spend time looking up this warning to find an explanation.
> We have that setting
> {_}zookeeper.metadata.migration.enable{_}=false
> and we still get that warning.
> In future version, to avoid further confusion this message should not be 
> showed when zookeeper is not involved at all .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16619) Unnecessary warning : "Loaded ZK migration state of NONE"

2024-04-25 Thread Jira
F Méthot created KAFKA-16619:


 Summary: Unnecessary warning : "Loaded ZK migration state of NONE"
 Key: KAFKA-16619
 URL: https://issues.apache.org/jira/browse/KAFKA-16619
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 3.6.2
Reporter: F Méthot


When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
involved.

We get this warning in the log:

[2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController)

 

Our project has no business with Zookeeper, seeing this message prompted us to 
investigate and spend time looking up this warning to find an explanation.

We have that setting

{_}zookeeper.metadata.migration.enable{_}=false

and we still get that warning.

In future version, to avoid further confusion this message should not be showed 
when zookeeper is not involved at all .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16620) Kraft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)
Gantigmaa Selenge created KAFKA-16620:
-

 Summary: Kraft quorum cannot be formed if all controllers are 
restarted at the same time
 Key: KAFKA-16620
 URL: https://issues.apache.org/jira/browse/KAFKA-16620
 Project: Kafka
  Issue Type: Bug
Reporter: Gantigmaa Selenge


Controller quorum cannot seem to form at all after accidentally restarting all 
controller nodes at the same time in a test environment. This is reproducible, 
happens almost everytime when restarting all controller nodes of the cluster. 

Started a cluster with 3 controller nodes and 3 broker nodes. After restarting 
the controller nodes, one of them becomes the active controller but resigns due 
to fetch timeout. The quorum leadership bounces off like this between the nodes 
indefinitely. 
The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds. 
Logs from an active controller:
```
2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
controller at epoch 34, next write offset 1116. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
(my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
 could not be established. Node may not be available. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448239,  current 
time: 1713362448776,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
(exclusive)with recovery point 1118, last flushed: 1713362448777,  current 
time: 
...
2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
(exclusive)with recovery point 1200, last flushed: 1713362489371,  current 
time: 1713362495934,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
from the majority of the voters within 3000ms. Current fetched voters are []. 
(org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the leadership 
due to a metadata log event. We were the leader at epoch 34, but in the new 
epoch 35, the leader is (none). Reverting to last stable offset 1198. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
failAll(NotControllerException): failing writeNoOpRecord(152156824). 
(org.apache.kafka.deferred.DeferredEventQueue) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
failed with NotControllerException in 6291037 microseconds. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
```
Logs from the follower:
```
024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
highWatermark=Optional[LogOffsetMetadata(offset=1113, 
metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
(org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
leader is 0. (org.apache.kafka.controller.QuorumController) 
[quorum-controller-2-event-handler]
2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1116 
(exclusive)with recovery point 1116, last flushed: 1713362442238,  current 
time: 1713362448247,unflushed: 2 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,777 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448249,  current 
time: 1713362448777,unflushed: 1 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:49,278 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 111

[jira] [Updated] (KAFKA-16620) Kraft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16620:
--
Description: 
Controller quorum cannot seem to form at all after accidentally restarting all 
controller nodes at the same time in a test environment. This is reproducible, 
happens almost everytime when restarting all controller nodes of the cluster.

Started a cluster with 3 controller nodes and 3 broker nodes. After restarting 
the controller nodes, one of them becomes the active controller but resigns due 
to fetch timeout. The quorum leadership bounces off like this between the nodes 
indefinitely. 
The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds. 
Logs from an active controller:
```
2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
controller at epoch 34, next write offset 1116. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
(my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
 could not be established. Node may not be available. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448239, current time: 
1713362448776,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
(exclusive)with recovery point 1118, last flushed: 1713362448777, current time: 
...
2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
(exclusive)with recovery point 1200, last flushed: 1713362489371, current time: 
1713362495934,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
from the majority of the voters within 3000ms. Current fetched voters are []. 
(org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the leadership 
due to a metadata log event. We were the leader at epoch 34, but in the new 
epoch 35, the leader is (none). Reverting to last stable offset 1198. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
failAll(NotControllerException): failing writeNoOpRecord(152156824). 
(org.apache.kafka.deferred.DeferredEventQueue) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
failed with NotControllerException in 6291037 microseconds. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
```
Logs from the follower:
```
024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
highWatermark=Optional[LogOffsetMetadata(offset=1113, 
metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
(org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
leader is 0. (org.apache.kafka.controller.QuorumController) 
[quorum-controller-2-event-handler]
2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1116 
(exclusive)with recovery point 1116, last flushed: 1713362442238, current time: 
1713362448247,unflushed: 2 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,777 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448249, current time: 
1713362448777,unflushed: 1 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:49,278 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1118 
(exclusive)with recovery point 1118, last flushed: 1713362448811, current time: 
1713362449278,unflushed
...
2024-04-17 14:01:29,371 DEBUG [UnifiedLog partition=__cl

[jira] [Assigned] (KAFKA-16620) Kraft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-16620:
-

Assignee: Gantigmaa Selenge

> Kraft quorum cannot be formed if all controllers are restarted at the same 
> time
> ---
>
> Key: KAFKA-16620
> URL: https://issues.apache.org/jira/browse/KAFKA-16620
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gantigmaa Selenge
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> Controller quorum cannot seem to form at all after accidentally restarting 
> all controller nodes at the same time in a test environment. This is 
> reproducible, happens almost everytime when restarting all controller nodes 
> of the cluster.
> Started a cluster with 3 controller nodes and 3 broker nodes. After 
> restarting the controller nodes, one of them becomes the active controller 
> but resigns due to fetch timeout. The quorum leadership bounces off like this 
> between the nodes indefinitely. 
> The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds.
> Logs from an active controller:
> {code:java}
> 2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
> controller at epoch 34, next write offset 1116. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
> (my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
>  could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
> (exclusive)with recovery point 1117, last flushed: 1713362448239, current 
> time: 1713362448776,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
> (exclusive)with recovery point 1118, last flushed: 1713362448777, current 
> time: 
> ...
> 2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
> (exclusive)with recovery point 1200, last flushed: 1713362489371, current 
> time: 1713362495934,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
> from the majority of the voters within 3000ms. Current fetched voters are []. 
> (org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the 
> leadership due to a metadata log event. We were the leader at epoch 34, but 
> in the new epoch 35, the leader is (none). Reverting to last stable offset 
> 1198. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
> failAll(NotControllerException): failing writeNoOpRecord(152156824). 
> (org.apache.kafka.deferred.DeferredEventQueue) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
> failed with NotControllerException in 6291037 microseconds. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]{code}
> Logs from the follower:
> {code:java}
> 024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
> FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
> highWatermark=Optional[LogOffsetMetadata(offset=1113, 
> metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
> Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
> (org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
> 2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
> leader is 0. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-2-event-handler]
> 2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1116 
> (exclusive)with recovery point 1116, last flushed: 1713362442238, current 
> time: 1713362448247,unflushed: 2 (kafka.log.UnifiedL

[jira] [Assigned] (KAFKA-16620) Kraft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-16620:
-

Assignee: Luke Chen  (was: Gantigmaa Selenge)

> Kraft quorum cannot be formed if all controllers are restarted at the same 
> time
> ---
>
> Key: KAFKA-16620
> URL: https://issues.apache.org/jira/browse/KAFKA-16620
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gantigmaa Selenge
>Assignee: Luke Chen
>Priority: Major
>
> Controller quorum cannot seem to form at all after accidentally restarting 
> all controller nodes at the same time in a test environment. This is 
> reproducible, happens almost everytime when restarting all controller nodes 
> of the cluster.
> Started a cluster with 3 controller nodes and 3 broker nodes. After 
> restarting the controller nodes, one of them becomes the active controller 
> but resigns due to fetch timeout. The quorum leadership bounces off like this 
> between the nodes indefinitely. 
> The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds.
> Logs from an active controller:
> {code:java}
> 2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
> controller at epoch 34, next write offset 1116. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
> (my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
>  could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
> (exclusive)with recovery point 1117, last flushed: 1713362448239, current 
> time: 1713362448776,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
> (exclusive)with recovery point 1118, last flushed: 1713362448777, current 
> time: 
> ...
> 2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
> (exclusive)with recovery point 1200, last flushed: 1713362489371, current 
> time: 1713362495934,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
> from the majority of the voters within 3000ms. Current fetched voters are []. 
> (org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the 
> leadership due to a metadata log event. We were the leader at epoch 34, but 
> in the new epoch 35, the leader is (none). Reverting to last stable offset 
> 1198. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
> failAll(NotControllerException): failing writeNoOpRecord(152156824). 
> (org.apache.kafka.deferred.DeferredEventQueue) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
> failed with NotControllerException in 6291037 microseconds. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]{code}
> Logs from the follower:
> {code:java}
> 024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
> FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
> highWatermark=Optional[LogOffsetMetadata(offset=1113, 
> metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
> Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
> (org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
> 2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
> leader is 0. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-2-event-handler]
> 2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1116 
> (exclusive)with recovery point 1116, last flushed: 1713362442238, current 
> time: 1713362448247,unflushed: 2 (kafka.lo

[jira] [Updated] (KAFKA-16620) Kraft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16620:
--
Description: 
Controller quorum cannot seem to form at all after accidentally restarting all 
controller nodes at the same time in a test environment. This is reproducible, 
happens almost everytime when restarting all controller nodes of the cluster.

Started a cluster with 3 controller nodes and 3 broker nodes. After restarting 
the controller nodes, one of them becomes the active controller but resigns due 
to fetch timeout. The quorum leadership bounces off like this between the nodes 
indefinitely. 
The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds.


Logs from an active controller:
{code:java}
2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
controller at epoch 34, next write offset 1116. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
activation. Loaded ZK migration state of NONE. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
(my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
 could not be established. Node may not be available. 
(org.apache.kafka.clients.NetworkClient) [kafka-0-raft-outbound-request-thread]
2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448239, current time: 
1713362448776,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
(exclusive)with recovery point 1118, last flushed: 1713362448777, current time: 
...
2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
(exclusive)with recovery point 1200, last flushed: 1713362489371, current time: 
1713362495934,unflushed: 1 (kafka.log.UnifiedLog) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
from the majority of the voters within 3000ms. Current fetched voters are []. 
(org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the leadership 
due to a metadata log event. We were the leader at epoch 34, but in the new 
epoch 35, the leader is (none). Reverting to last stable offset 1198. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
failAll(NotControllerException): failing writeNoOpRecord(152156824). 
(org.apache.kafka.deferred.DeferredEventQueue) 
[quorum-controller-0-event-handler]
2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
failed with NotControllerException in 6291037 microseconds. 
(org.apache.kafka.controller.QuorumController) 
[quorum-controller-0-event-handler]{code}

Logs from the follower:


{code:java}
024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
highWatermark=Optional[LogOffsetMetadata(offset=1113, 
metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
(org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
leader is 0. (org.apache.kafka.controller.QuorumController) 
[quorum-controller-2-event-handler]
2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1116 
(exclusive)with recovery point 1116, last flushed: 1713362442238, current time: 
1713362448247,unflushed: 2 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:48,777 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1117 
(exclusive)with recovery point 1117, last flushed: 1713362448249, current time: 
1713362448777,unflushed: 1 (kafka.log.UnifiedLog) [kafka-2-raft-io-thread]
2024-04-17 14:00:49,278 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1118 
(exclusive)with recovery point 1118, last flushed: 1713362448811, current time: 
1713362449278,unflushed
...
2024-04-17 14:01:29,371 DEBUG [Uni

[jira] [Updated] (KAFKA-16620) KRaft quorum cannot be formed if all controllers are restarted at the same time

2024-04-25 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16620:
--
Summary: KRaft quorum cannot be formed if all controllers are restarted at 
the same time  (was: Kraft quorum cannot be formed if all controllers are 
restarted at the same time)

> KRaft quorum cannot be formed if all controllers are restarted at the same 
> time
> ---
>
> Key: KAFKA-16620
> URL: https://issues.apache.org/jira/browse/KAFKA-16620
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gantigmaa Selenge
>Assignee: Luke Chen
>Priority: Major
>
> Controller quorum cannot seem to form at all after accidentally restarting 
> all controller nodes at the same time in a test environment. This is 
> reproducible, happens almost everytime when restarting all controller nodes 
> of the cluster.
> Started a cluster with 3 controller nodes and 3 broker nodes. After 
> restarting the controller nodes, one of them becomes the active controller 
> but resigns due to fetch timeout. The quorum leadership bounces off like this 
> between the nodes indefinitely. 
> The controller.quorum.fetch.timeout.ms was set to the default of 2 seconds.
> Logs from an active controller:
> {code:java}
> 2024-04-17 14:00:48,250 INFO [QuorumController id=0] Becoming the active 
> controller at epoch 34, next write offset 1116. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,250 WARN [QuorumController id=0] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:00:48,701 INFO [RaftManager id=0] Node 1 disconnected. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,701 WARN [RaftManager id=0] Connection to node 1 
> (my-cluster-controller-1.my-cluster-kafka-brokers.roller.svc.cluster.local/10.244.0.68:9090)
>  could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient) 
> [kafka-0-raft-outbound-request-thread]
> 2024-04-17 14:00:48,776 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1117 
> (exclusive)with recovery point 1117, last flushed: 1713362448239, current 
> time: 1713362448776,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:00:49,277 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1118 
> (exclusive)with recovery point 1118, last flushed: 1713362448777, current 
> time: 
> ...
> 2024-04-17 14:01:35,934 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log0] Flushing log up to offset 1200 
> (exclusive)with recovery point 1200, last flushed: 1713362489371, current 
> time: 1713362495934,unflushed: 1 (kafka.log.UnifiedLog) 
> [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,121 INFO [RaftManager id=0] Did not receive fetch request 
> from the majority of the voters within 3000ms. Current fetched voters are []. 
> (org.apache.kafka.raft.LeaderState) [kafka-0-raft-io-thread]
> 2024-04-17 14:01:36,223 WARN [QuorumController id=0] Renouncing the 
> leadership due to a metadata log event. We were the leader at epoch 34, but 
> in the new epoch 35, the leader is (none). Reverting to last stable offset 
> 1198. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] 
> failAll(NotControllerException): failing writeNoOpRecord(152156824). 
> (org.apache.kafka.deferred.DeferredEventQueue) 
> [quorum-controller-0-event-handler]
> 2024-04-17 14:01:36,223 INFO [QuorumController id=0] writeNoOpRecord: event 
> failed with NotControllerException in 6291037 microseconds. 
> (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]{code}
> Logs from the follower:
> {code:java}
> 024-04-17 14:00:48,242 INFO [RaftManager id=2] Completed transition to 
> FollowerState(fetchTimeoutMs=2000, epoch=34, leaderId=0, voters=[0, 1, 2], 
> highWatermark=Optional[LogOffsetMetadata(offset=1113, 
> metadata=Optional.empty)], fetchingSnapshot=Optional.empty) from 
> Voted(epoch=34, votedId=0, voters=[0, 1, 2], electionTimeoutMs=1794) 
> (org.apache.kafka.raft.QuorumState) [kafka-2-raft-io-thread]
> 2024-04-17 14:00:48,242 INFO [QuorumController id=2] In the new epoch 34, the 
> leader is 0. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-2-event-handler]
> 2024-04-17 14:00:48,247 DEBUG [UnifiedLog partition=__cluster_metadata-0, 
> dir=/var/lib/kafka/data/kafka-log2] Flushing log up to offset 1

Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


lianetm commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077166262

   Hey @phooq, having a `getDetails` that the inheritors override does achieve 
the clarity I was looking for, sounds good. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-04-25 Thread via GitHub


AyoubOm commented on PR #15790:
URL: https://github.com/apache/kafka/pull/15790#issuecomment-2077173169

   ping @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)
yuzhou created KAFKA-16621:
--

 Summary: Alter MirrorSourceConnector offsets dont work
 Key: KAFKA-16621
 URL: https://issues.apache.org/jira/browse/KAFKA-16621
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: yuzhou
 Attachments: image-2024-04-25-21-28-37-375.png

In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-33-51-892.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-25-283.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-03-531.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> !image-2024-04-25-21-38-25-283.png!
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns the first one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-03-531.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-33-51-892.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> !image-2024-04-25-21-38-03-531.png!
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns the first one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-45-287.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-25-283.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> !image-2024-04-25-21-38-45-287.png!
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns the first one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}> \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}> \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  > \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}  > \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}> \{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"}> \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  -> \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}  -> \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

!image-2024-04-25-21-38-45-287.png!

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns the first one.
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}  -> \{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"}  -> \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  > \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}  > \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  -> \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}  -> \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}  > \{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"}  > \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}               {"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}               \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  -- \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} -- \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}               {"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"}               \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}\{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}\{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}               {"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}               \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}\{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"} {"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}  -- \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} -- \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}> \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"}> \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}  -- \{"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"} -- \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  \{"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  \{"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} {"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"} {"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}


{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}

{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} {"offset":2}
> {"partition":2,"topic":"topic","cluster":"A"} {"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}


\{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}


{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} {"offset":2}
> \{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns  {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}{"offset":2}

 

{"partition":2,"topic":"topic","cluster":"A"}{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns 

{"offset":2}

 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}

 

{"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns 

{"offset":2}

 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"}{"offset":2}
>  
> {"partition":2,"topic":"topic","cluster":"A"}{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns 
> {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}

 

{"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns 

{"offset":2}

 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}


\{"partition":2,"topic":"topic","cluster":"A"} {"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns  {"offset":2}
 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} \{"offset":2}
>  
> {"partition":2,"topic":"topic","cluster":"A"} \{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns 
> {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16621) Alter MirrorSourceConnector offsets dont work

2024-04-25 Thread yuzhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzhou updated KAFKA-16621:
---
Description: 
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"} {"offset":2}

 

{"partition":2,"topic":"topic","cluster":"A"}{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns 

{"offset":2}

 

 

  was:
In connect-offsets topic:

the offsets wrote by connector, key is 
`\{"cluster":"A","partition":2,"topic":"topic"}`

after alter offsets, the key is  
`\{"partition":2,"topic":"topic","cluster":"A"}`

!image-2024-04-25-21-28-37-375.png!

in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
different strings:

{"cluster":"A","partition":2,"topic":"topic"}{"offset":2}

 

{"partition":2,"topic":"topic","cluster":"A"}{"offset":3}

So alter offsets is not succussful, because when get offsets from 
globalOffsetBackingStore, always returns 

{"offset":2}

 

 


> Alter MirrorSourceConnector offsets dont work
> -
>
> Key: KAFKA-16621
> URL: https://issues.apache.org/jira/browse/KAFKA-16621
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yuzhou
>Priority: Major
> Attachments: image-2024-04-25-21-28-37-375.png
>
>
> In connect-offsets topic:
> the offsets wrote by connector, key is 
> `\{"cluster":"A","partition":2,"topic":"topic"}`
> after alter offsets, the key is  
> `\{"partition":2,"topic":"topic","cluster":"A"}`
> !image-2024-04-25-21-28-37-375.png!
> in Worker.globalOffsetBackingStore.data, both two keys exist, because the are 
> different strings:
> {"cluster":"A","partition":2,"topic":"topic"} {"offset":2}
>  
> {"partition":2,"topic":"topic","cluster":"A"}{"offset":3}
> So alter offsets is not succussful, because when get offsets from 
> globalOffsetBackingStore, always returns 
> {"offset":2}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9800: Exponential backoff for Kafka clients - KIP-580 [kafka]

2024-04-25 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1579542525


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -384,6 +389,32 @@ int attempts() {
 return attempts.get();
 }
 
+/*
+ * Returns whether the leader Node has changed since the last attempt.
+ * @param node The Node currently thought of as the leader, which might be 
null.
+ * @return true if the leader has changed, otherwise false
+ */
+boolean hasLeaderChanged(Node latestLeader) {
+boolean leaderChanged = false;
+if (latestLeader != null) {
+// If we don't know the leader yet, we have not yet attempted to 
send to the leader
+if (currentLeader == null) {
+currentLeader = latestLeader;
+} else {
+// If the leader's node id has changed, this counts as a 
leader change
+if (currentLeader.id() != latestLeader.id()) {

Review Comment:
   Yes, I believe so. If you look at the current 
`RecordAccumulator.shouldBackoff`, it knows when the leader changed and changes 
behaviour accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: skip 'zinc' phase from gradle dependency-check plugin [kafka]

2024-04-25 Thread via GitHub


raboof commented on PR #15054:
URL: https://github.com/apache/kafka/pull/15054#issuecomment-2077285239

   > please ask a committer for review
   
   Perhaps @jlprat? :innocent: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: skip 'zinc' phase from gradle dependency-check plugin [kafka]

2024-04-25 Thread via GitHub


jlprat merged PR #15054:
URL: https://github.com/apache/kafka/pull/15054


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555219


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   This relates to your comment above. You're right that we did not have the 
check to ensure the reset happened on the sent and not on the response, so I 
added above the check for the `timeToNextHeartbeatMs` that would fail if the 
timer is not reset on the send, with a specific message for it. That check 
covers it, but still I also added the steps for advance the timer just a bit, 
check that no HB is sent and that the time is updated with the difference, as 
you suggested. All done. 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");

Review Comment:
   Sure! I missed it. Added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555839


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -469,19 +469,33 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+/**
+ * Check if a heartbeat request should be sent on the current time. A 
heartbeat should be
+ * sent if the heartbeat timer has expired, backoff has expired, and 
there is no request
+ * in-flight.
+ */
 @Override
 public boolean canSendRequest(final long currentTimeMs) {
 update(currentTimeMs);
 return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
 }
 
-public long nextHeartbeatMs(final long currentTimeMs) {
+public long timeToNextHeartbeatMs(final long currentTimeMs) {
 if (heartbeatTimer.remainingMs() == 0) {

Review Comment:
   They achieve the same here, and totally agree that `isExpired` is more 
readable, fixed. (Sensible "since we're here..." to me too btw)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2077313831

   Thanks for the helpful comments @cadonna , all addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-25 Thread via GitHub


mimaison commented on PR #15786:
URL: https://github.com/apache/kafka/pull/15786#issuecomment-2077391342

   Quite a few failures in the last CI run but it seems to be flaky tests as 
none of the failures happened on all platforms. Merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-25 Thread via GitHub


mimaison merged PR #15786:
URL: https://github.com/apache/kafka/pull/15786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-25 Thread via GitHub


dajac merged PR #15717:
URL: https://github.com/apache/kafka/pull/15717


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16568) Add JMH Benchmarks for assignor performance testing

2024-04-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16568.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Add JMH Benchmarks for assignor performance testing 
> 
>
> Key: KAFKA-16568
> URL: https://issues.apache.org/jira/browse/KAFKA-16568
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Ritika Reddy
>Priority: Major
> Fix For: 3.8.0
>
>
> The 3 benchmarks that are being used to test the performance and efficiency 
> of the consumer group rebalance process.
>  * Client Assignors (assign method)
>  * Server Assignors (assign method)
>  * Target Assignment Builder (build method)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-25 Thread via GitHub


johnnychhsu commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2077461634

   > @johnnychhsu Instead of removing them, could you make `MetadataLogConfig` 
use those help methods?
   > The reason these aren't used is because `MetadataLogConfig` signature uses 
`AbstractConfig` and not `KafkaConfig`. We should update `MetadataLogConfig` 
class and make it use these methods as @chia7712 mentioned.
   @chia7712 @OmniaGM just updated according to the suggestions, thanks for the 
review! 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-25 Thread via GitHub


OmniaGM commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2077482581

   Just rebase to fix the conflicts 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in raft [kafka]

2024-04-25 Thread via GitHub


mimaison opened a new pull request, #15805:
URL: https://github.com/apache/kafka/pull/15805

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15802:
URL: https://github.com/apache/kafka/pull/15802#discussion_r1579702459


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -91,65 +91,97 @@ public void testDeleteOffsetsNonExistingGroup() {
 
 @ClusterTest
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
-for (Map consumerConfig : consumerConfigs) {
-createTopic(TOPIC);
-testWithConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
-removeTopic(TOPIC);
+int idx = 0;
+for (Iterator> it = consumerConfigs.iterator(); 
it.hasNext(); idx++) {
+Map consumerConfig = it.next();
+String topic = TOPIC_PREFIX + idx;
+String group = GROUP_PREFIX + idx;
+createTopic(topic);
+testWithConsumerGroup(topic, topic, group, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
+removeTopic(topic);
 }
 }
 
 @ClusterTest
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
-for (Map consumerConfig : consumerConfigs) {
-createTopic(TOPIC);
-testWithConsumerGroup(TOPIC, -1, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
-removeTopic(TOPIC);
+int idx = 0;
+for (Iterator> it = consumerConfigs.iterator(); 
it.hasNext(); idx++) {
+Map consumerConfig = it.next();
+String topic = TOPIC_PREFIX + idx;

Review Comment:
   Maybe we can use test case + protocol name to be unique name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-25 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-16622:
-

 Summary: Mirromaker2 first Checkpoint not emitted until consumer 
group fully catches up once
 Key: KAFKA-16622
 URL: https://issues.apache.org/jira/browse/KAFKA-16622
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.6.2, 3.7.0, 3.8.0
Reporter: Edoardo Comar
 Attachments: edo-connect-mirror-maker-sourcetarget.properties

We observed an excessively delayed emission of the MM2 Checkpoint record.
It only gets created when the source consumer reaches the end of a topic. This 
does not seem reasonable.

In a very simple setup :

Tested with a standalone single process MirrorMaker2 mirroring between two 
single-node kafka clusters(mirromaker config attached) with quick refresh 
intervals (eg 5 sec) and a small offset.lag.max (eg 10)

create a single topic in the source cluster
produce data to it (e.g. 1 records)
start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec between 
polls which commits after each poll

watch the Checkpoint topic in the target cluster

bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
  --topic source.checkpoints.internal \
  --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
   --from-beginning

-> no record appears in the checkpoint topic until the consumer reaches the end 
of the topic (ie its consumer group lag gets down to 0).







--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15802:
URL: https://github.com/apache/kafka/pull/15802#discussion_r1579708062


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -169,20 +201,22 @@ private static ConsumerGroupCommand.ConsumerGroupService 
consumerGroupService(St
 );
 }
 
-private void testWithConsumerGroup(String inputTopic,
+private void testWithConsumerGroup(String inputTopicWithData,
+   String inputTopicForTest,
+   String inputGroup,
int inputPartition,
int expectedPartition,
Errors expectedError,
boolean isStable,
Map consumerConfig) {
-produceRecord();
+produceRecord(inputTopicWithData);

Review Comment:
   We can move `produceRecord();` out of `testWithConsumerGroup` to simplify 
the arguments of `testWithConsumerGroup`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


phooq commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077592945

   Thanks so @lianetm ! Hey @kirktrue , I plan to, on top the current changes I 
have,  rename the `toStringBase` method as `getDetails` for `RequestState`, 
does this look okay to you?
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15737:
URL: https://github.com/apache/kafka/pull/15737#discussion_r1579722758


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -140,22 +150,31 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
 def __init__(self, node, verify_offsets, idx):
 super().__init__(node, verify_offsets, idx)
 
-def handle_partitions_revoked(self, event):
+def handle_partitions_revoked(self, event, node, logger):
 self.revoked_count += 1
 self.state = ConsumerState.Rebalancing
 self.position = {}
+revoked = []
+
 for topic_partition in event["partitions"]:
-topic = topic_partition["topic"]
-partition = topic_partition["partition"]
-self.assignment.remove(TopicPartition(topic, partition))
+tp = _create_partition_from_dict(topic_partition)
+assert tp in self.assignment, \
+"Topic partition %s cannot be revoked from %s as it was not 
previously assigned to that consumer" % \
+(tp, node.account.hostname)

Review Comment:
   @lucasbru—this is the main functional change: ensure that an attempt to 
remove a partition from the local state verifies that it was previously 
assigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1579725251


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   @philipnee—I didn't make any changes to the name as `requestInFlight` was 
the existing method name. Are you OK to leave this for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579733831


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) {
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
 acquireAndEnsureOpen();
 try {
-AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
+Timer timer = time.timer(Long.MAX_VALUE);
+AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, 
timer);

Review Comment:
   Yeah, I went back and forth on this a few times 😉
   
   Ultimately I wanted to force the caller to be explicit about its timeout 
intention, vs. having it implicitly "hidden" away in the event hierarchy.
   
   Also, to create a `Timer` in the event constructor, we'd have to pass in a 
`Time` object (`time.timer(Long.MAX_VALUE)`), which seemed a bit obtuse, so 
🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579735209


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
 closeTimer.update();
+
+if (backgroundEventReaper != null && backgroundEventQueue != null) {

Review Comment:
   They're only `null` if there was an error in the constructor. The 
constructor's `finally` block calls `close()`, so we need to handle the case 
where the consumer wasn't fully constructed before it's closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077634416

   Thanks for the work on this @phooq!
   
   > I plan to, on top the current changes I have, rename the `toStringBase` 
method as `getDetails`
   
   Renaming `toStringBase()` to `getDetails()` makes its purpose more vague, in 
my opinion 😞
   
   Keep in mind that the naming convention `toStringBase()` is used in 
`ApplicationEvent`, `BackgroundEvent`, and maybe elsewhere, too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579756071


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -144,6 +166,12 @@ void runOnce() {
 .map(Optional::get)
 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
 .reduce(Long.MAX_VALUE, Math::min);
+
+// "Complete" any events that have expired. This cleanup step should 
only be called after the network I/O
+// thread has made at least one call to poll. This is done to emulate 
the behavior of the legacy consumer's
+// handling of timeouts. The legacy consumer makes at least one 
attempt to satisfy any network requests
+// before checking if a timeout has expired.

Review Comment:
   I'm happy to reword the comment and clean it up, but the lines that follow 
that comment are the raison d'être of this change. It's very subtle and easy to 
miss, hence the call-out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579760563


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1338,7 +1339,14 @@ private CompletableFuture 
enqueueConsumerRebalanceListenerCallback(Consume
  
Set partitions) {
 SortedSet sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
 sortedPartitions.addAll(partitions);
-CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+
+// We don't yet have the concept of having an expiring callback, but 
we will likely want that eventually.
+Timer timer = time.timer(Long.MAX_VALUE);
+CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(
+methodName,
+sortedPartitions,
+timer
+);

Review Comment:
   I'll look into how to do this in a way that I don't find too ugly 😉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in metadata [kafka]

2024-04-25 Thread via GitHub


mimaison opened a new pull request, #15806:
URL: https://github.com/apache/kafka/pull/15806

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in generator [kafka]

2024-04-25 Thread via GitHub


mimaison opened a new pull request, #15807:
URL: https://github.com/apache/kafka/pull/15807

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st opened a new pull request, #15808:
URL: https://github.com/apache/kafka/pull/15808

   Because TestUtils.scala has some unused method, and some methods only use in 
one class, so I delete unused methods and move methods to individual class 
which only one class used


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077701719

   What about `toStringDetails()`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]

2024-04-25 Thread via GitHub


mimaison commented on code in PR #14847:
URL: https://github.com/apache/kafka/pull/14847#discussion_r1579785296


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##
@@ -47,7 +47,7 @@ public final class MetaProperties {
 /**
  * The property that specifies the node id. Replaces broker.id in V1.
  */
-static final String NODE_ID_PROP = "node.id";
+public static final String NODE_ID_PROP = "node.id";

Review Comment:
   This is really not nice that we have to do that. I wonder if we need to wait 
for the `NodeIdProp` from KafkaConfig to move instead of doing that.



##
tools/src/test/java/org/apache/kafka/tools/StorageToolTest.java:
##
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.metadata.properties.PropertiesUtils;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(value = 40)
+public class StorageToolTest {
+private Properties newSelfManagedProperties() {
+Properties properties = new Properties();
+properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
"/tmp/foo,/tmp/bar");
+properties.setProperty(StorageTool.PROCESS_ROLES_CONFIG, "controller");
+properties.setProperty(MetaProperties.NODE_ID_PROP, "2");
+properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
"2@localhost:9092");
+return properties;
+}
+
+@Test
+public void testConfigToLogDirectories() {
+LogConfig config = new LogConfig(newSelfManagedProperties());
+assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/foo")), 
StorageTool.configToLogDirectories(config));
+}
+
+@Test
+public void testConfigToLogDirectoriesWithMetaLogDir() {
+Properties properties = newSelfManagedProperties();
+properties.setProperty(StorageTool.METADATA_LOG_DIR_CONFIG, 
"/tmp/baz");
+LogConfig config = new LogConfig(properties);
+assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/baz", 
"/tmp/foo")), StorageTool.configToLogDirectories(config));
+}
+
+@Test
+public void testInfoCommandOn

Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2077728033

   @chia7712 please review this PR, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-04-25 Thread Kirk True (Jira)
Kirk True created KAFKA-16623:
-

 Summary: KafkaAsyncConsumer system tests warn about revoking 
partitions that weren't previously assigned
 Key: KAFKA-16623
 URL: https://issues.apache.org/jira/browse/KAFKA-16623
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

It is unclear what is causing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-25 Thread via GitHub


johnnychhsu commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-208202

   thanks for the prompt reply @OmniaGM !
   just updated to resolve the conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1579862658


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,40 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+private void testTrackInflight(BiConsumer 
onCompletedAttempt) {
+RequestState state = new RequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+2,
+1000,
+0);
+
+// This is just being paranoid...
+assertFalse(state.requestInFlight());
+
+// When we've sent a request, the flag should update from false to 
true.
+state.onSendAttempt();
+assertTrue(state.requestInFlight());
+
+// Now we've received the response.
+onCompletedAttempt.accept(state, 236);
+
+// When we've sent a second request with THE SAME TIMESTAMP as the 
previous response,

Review Comment:
   I added back the timestamp so we could use it in the `lastSentMs` value for 
debugging. So the comment should make sense again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >