Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]

2023-11-27 Thread via GitHub


showuon commented on code in PR #14840:
URL: https://github.com/apache/kafka/pull/14840#discussion_r1407350742


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 }
 
 @Test
-public void testInternalTopicExists() {
+public void testDoesTopicExist() {
 Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
 ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
 try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
-String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+String topic = "test-topic-exist";
+remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new 
Properties(),

Review Comment:
   Yes, you're right, we'll make sure the metadata is propagated to all nodes 
for both KRaft and ZK modes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-11-27 Thread via GitHub


nikramakrishnan commented on code in PR #14649:
URL: https://github.com/apache/kafka/pull/14649#discussion_r1407348678


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 leaderEpochCache.foreach(_.clearAndFlush())
 producerStateManager.truncateFullyAndStartAt(newOffset)
 logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)

Review Comment:
   Thanks @kamalcph. I agree with that. I have updated the PR to update the 
local log start offset in `truncateFullyAndStartAt` and also in  
`buildRemoteLogAuxState` with the correct reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]

2023-11-27 Thread via GitHub


dajac opened a new pull request, #14850:
URL: https://github.com/apache/kafka/pull/14850

   This is a follow-up to https://github.com/apache/kafka/pull/14687 as we 
found out that some parameterized tests do not include the test method name in 
their name. For the context, the JUnit XML report does not include the name of 
the method by default but only rely on the display name provided.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


showuon commented on PR #14428:
URL: https://github.com/apache/kafka/pull/14428#issuecomment-1829267766

   > Hi @showuon ,
   > 
   > Thanks for the changes. They look good to me in general. One potential 
issue with this implementation is that the leader doesn't check that the 
fetching voters are making progress.
   > 
   > Just because the leader returned a successful response to FETCH and 
FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response 
correctly.
   > 
   > For example, imagine the case where the log end offset (LEO) is at 1000 
and all of the followers are continuously fetching at offset 0 without ever 
increasing their fetch offset. This can happen if the followers encounter an 
error when processing the FETCH or FETCH_SNAPSHOT response.
   > 
   > In this scenario the leader will never be able to increase the HWM. I 
think that this scenario is specific to KRaft and doesn't exists in Raft 
because KRaft is pull vs Raft which is push.
   > 
   > What do you think? Do you agree? If so should we address this issue in 
this PR or create an issue for this and fix it in a future PR?
   
   Good catch! Yes, that's indeed a potential problem. This PR has been pending 
for a long time, let's focus on the current issue in this PR first. I've filed: 
[KAFKA-15911](https://issues.apache.org/jira/browse/KAFKA-15911) for the 
potential issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15911) KRaft quorum leader should make sure the follower fetch is making progress

2023-11-27 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15911:
-

 Summary: KRaft quorum leader should make sure the follower fetch 
is making progress
 Key: KAFKA-15911
 URL: https://issues.apache.org/jira/browse/KAFKA-15911
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Luke Chen


Just because the leader returned a successful response to FETCH and 
FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response 
correctly.

For example, imagine the case where the log end offset (LEO) is at 1000 and all 
of the followers are continuously fetching at offset 0 without ever increasing 
their fetch offset. This can happen if the followers encounter an error when 
processing the FETCH or FETCH_SNAPSHOT response.

In this scenario the leader will never be able to increase the HWM. I think 
that this scenario is specific to KRaft and doesn't exists in Raft because 
KRaft is pull vs Raft which is push.


https://github.com/apache/kafka/pull/14428#pullrequestreview-1751408695



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1407337495


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-if (shutdown.get() != null || state.isResignRequested()) {
+if (shutdown.get() != null || state.isResignRequested() || 
state.hasMajorityFollowerFetchExpired(currentTimeMs)) {

Review Comment:
   Ah, good catch! Updated the `hasMajorityFollowerFetchExpired` to 
`timeUntilCheckQuorumExpires` now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1407336829


##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -447,6 +452,38 @@ public void testDescribeQuorumWithObservers() {
 observerState);
 }
 
+@Test
+public void testMajorityFollowerFetchTimeoutExpiration() {
+int node1 = 1;
+int node2 = 2;
+int node3 = 3;
+int node4 = 4;
+int observer5 = 5;
+LeaderState state = newLeaderState(mkSet(localId, node1, node2, 
node3, node4), 0L);
+
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
+int resignLeadershipTimeout = (int) (fetchTimeoutMs * 1.5);

Review Comment:
   Agree! If we need to update the `1.5` factor, we can change in one place. 
Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-11-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14843:
URL: https://github.com/apache/kafka/pull/14843#discussion_r1407336676


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4387,7 +4398,52 @@ public FenceProducersResult 
fenceProducers(Collection transactionalIds,
 
 @Override
 public Uuid clientInstanceId(Duration timeout) {

Review Comment:
   @AndrewJSchofield Please can you re-review this method change as I added 
this after your review. Also I am not sure if the changes in AdminClient is 
helpful for fetching `client instance id`. I cannot think of a valid use case 
where this method in AdminClient will be required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1407334210


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 
 UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
 
+Optional> state = quorum.maybeLeaderState();
+state.ifPresent(s -> 
s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs));
+

Review Comment:
   Good point. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1407333526


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);

Review Comment:
   Good suggestion. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-27 Thread via GitHub


apoorvmittal10 commented on PR #14620:
URL: https://github.com/apache/kafka/pull/14620#issuecomment-1829257829

   @xvrl @mjsax Please help closing the PR, do we need to address any concern 
in the PR or we can merge. If required we can have a follow up PR as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]

2023-11-27 Thread via GitHub


lihaosky commented on code in PR #14605:
URL: https://github.com/apache/kafka/pull/14605#discussion_r1407319712


##
streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java:
##
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory;
+import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext;
+import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockFixedKeyProcessorContextTest {

Review Comment:
   Right. I think you can parameterize `MockProcessorContextAPITest` so common 
tests run for both processor context mock and add some tests for 
`FixedKeyProcessor` only (return directly for other processor context)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198957


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,60 +275,56 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+List autocommitRequest =
+
Collections.singletonList(requestManagers.commitRequestManager.get().commitAllConsumedPositions());
+networkClientDelegate.addAll(autocommitRequest);
 do {
 long currentTimeMs = timer.currentTimeMs();
 ensureCoordinatorReady(timer);
 networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
-} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+} while (timer.notExpired() && 
!autocommitRequest.get(0).future().isDone());
+}
+
+void maybeLeaveGroup(final Timer timer) {
+// TODO: Leave group upon closing the consumer
 }
 
 private void ensureCoordinatorReady(final Timer timer) {
-while (!coordinatorReady()) {
+while (!coordinatorReady() && timer.notExpired()) {
 findCoordinatorSync(timer);
 }
 }
 
 private boolean coordinatorReady() {
-CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.orElseThrow(
+() -> new IllegalStateException("CoordinatorRequestManager 
uninitialized."));

Review Comment:
   Probably unnecessary - but adding this for a safety check wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198120


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -210,17 +210,19 @@ public CompletableFuture 
maybeAutoCommitAllConsumed() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {

Review Comment:
   Unhappy with the naming, but can't seem to find a better way to restructure 
this at the moment.  Didn't like it because there is an infinite variation of 
canXxxx().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]

2023-11-27 Thread via GitHub


kamalcph commented on code in PR #14840:
URL: https://github.com/apache/kafka/pull/14840#discussion_r1407187421


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 }
 
 @Test
-public void testInternalTopicExists() {
+public void testDoesTopicExist() {
 Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
 ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
 try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
-String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+String topic = "test-topic-exist";
+remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new 
Properties(),

Review Comment:
   
[TestUtils#createTopicWithAdmin](https://sourcegraph.com/github.com/apache/kafka@fade3d10ea07eea5d6076b8fb1b68e2db5ffec48/-/blob/core/src/test/scala/unit/kafka/utils/TestUtils.scala?L497)
 waits for the metadata to be propagated to all the brokers before returning 
the call so the test will pass. Also, tried running the test repeatedly 100 
times to ensure that the test is not flaky. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent())
 return EMPTY;
 
-maybeAutoCommitAllConsumed();
+maybeAutoCommit();

Review Comment:
   we always commit allConsumed(), so there's no point to reinstate that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-27 Thread via GitHub


jeffkbkim opened a new pull request, #14849:
URL: https://github.com/apache/kafka/pull/14849

   After the new coordinator loads a __consumer_offsets partition, it logs the 
following exception when making a read operation (fetch/list groups, etc):
   
```
   java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot 
epochs are:
   at 
org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178)
   at 
org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407)
   at 
org.apache.kafka.timeline.TimelineHashMap$ValueIterator.(TimelineHashMap.java:283)
   at 
org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271)
   ```

   This happens because we don't have a snapshot at the last updated high 
watermark after loading. We cannot generate a snapshot at the high watermark 
after loading all batches because it may contain records that have not yet been 
committed. We also don't know where the high watermark will advance up to so we 
need to generate a snapshot for each offset the loader observes to be greater 
than the current high watermark. Then once we add the high watermark listener 
and update the high watermark we can delete all of the older snapshots. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd merged PR #14766:
URL: https://github.com/apache/kafka/pull/14766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1829038408

   Thanks @kamalcph for pointing out the JIRA to track the existing 
intermittent tiered storage related test failure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15224: automating version change [kafka]

2023-11-27 Thread via GitHub


github-actions[bot] commented on PR #14229:
URL: https://github.com/apache/kafka/pull/14229#issuecomment-1829016302

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15910) New group coordinator needs to generate snapshots while loading

2023-11-27 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-15910:


 Summary: New group coordinator needs to generate snapshots while 
loading
 Key: KAFKA-15910
 URL: https://issues.apache.org/jira/browse/KAFKA-15910
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


After the new coordinator loads a __consumer_offsets partition, it logs the 
following exception when making a read operation (fetch/list groups, etc):

 
{{{}java.lang.RuntimeException: No in-memory snapshot for epoch 740745. 
Snapshot epochs are:{}}}{{{}at 
org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178){}}}{{{}at
 
org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407){}}}{{{}at
 
org.apache.kafka.timeline.TimelineHashMap$ValueIterator.(TimelineHashMap.java:283){}}}{{{}at
 
org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271){}}}
{{...}}
 
This happens because we don't have a snapshot at the last updated high 
watermark after loading. We cannot generate a snapshot at the high watermark 
after loading all batches because it may contain records that have not yet been 
committed. We also don't know where the high watermark will advance up to so we 
need to generate a snapshot for each offset the loader observes to be greater 
than the current high watermark. Then once we add the high watermark listener 
and update the high watermark we can delete all of the snapshots prior. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-if (shutdown.get() != null || state.isResignRequested()) {
+if (shutdown.get() != null || state.isResignRequested() || 
state.hasMajorityFollowerFetchExpired(currentTimeMs)) {

Review Comment:
   In line 
[2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014)
 KRaft calculates the next time it should poll the leader.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend);
   ```
   
   If you extend this to something like below, it should wake up the leader 
when the timer has expired.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend, 
state.timeUntilCheckQuorumExpires(currentTimeMs));
   ```
   
   The other option is to extend `hasMajorityFollowerFetchExpired` to include 
this information by using `Timer::remainingMs` instead of `isExpired`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-if (shutdown.get() != null || state.isResignRequested()) {
+if (shutdown.get() != null || state.isResignRequested() || 
state.hasMajorityFollowerFetchExpired(currentTimeMs)) {

Review Comment:
   In line 
[2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014)
 KRaft calculates the next time it should poll the leader.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend);
   ```
   
   If you extend this to something like below, it should wake up the leader 
when the timer has expired.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend, 
state.timeUntilCheckQuorumExpires(currentTimeMs));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-if (shutdown.get() != null || state.isResignRequested()) {
+if (shutdown.get() != null || state.isResignRequested() || 
state.hasMajorityFollowerFetchExpired(currentTimeMs)) {

Review Comment:
   In line 
[2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014)
 KRaft calculates the next time it should poll the leader.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend);
   ```
   
   If you extend this to something like below, it should wake up the leader 
when the timer has expired.
   ```java
   return Math.min(timeUntilFlush, timeUntilSend, 
state.checkQuorumExpiry());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406806920


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+// the voters within fetch timeout.
+public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+fetchTimer.update(currentTimeMs);
+boolean isExpired = fetchTimer.isExpired();
+if (isExpired) {
+log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+fetchTimeoutMs, fetchedVoters);
+}
+return isExpired;
+}
+
+// Reset the fetch timer if we've received fetch/fetchSnapshot request 
from the majority of the voter
+public void maybeResetMajorityFollowerFetchTimer(int id, long 
currentTimeMs) {
+updateFetchedVoters(id);
+// The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
+int majority = voterStates.size() / 2;
+if (fetchedVoters.size() >= majority) {
+fetchedVoters.clear();
+fetchTimer.update(currentTimeMs);
+fetchTimer.reset(fetchTimeoutMs);
+}
+}
+
+private void updateFetchedVoters(int id) {
+if (isVoter(id)) {

Review Comment:
   We should be defensive against this getting called with the local replica. 
Let's throw an `IllegalArgumentException`, if `id` is equal to the `localId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406803592


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+// the voters within fetch timeout.
+public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {

Review Comment:
   See my other comment about check quorum and Alyssa's pre-vote KIP. If you 
agree, maybe we should call this `hasCheckQuorumFailed`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406771117


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+// the voters within fetch timeout.

Review Comment:
   Can we make this a Java doc comment. E.g.:
   ```java
   /**
* Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from
* the majority of the voters within fetch timeout.
*
* @param ...
* @return ...
*/
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]

2023-11-27 Thread via GitHub


showuon commented on PR #14631:
URL: https://github.com/apache/kafka/pull/14631#issuecomment-1828916867

   @mumrah , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]

2023-11-27 Thread via GitHub


jeffkbkim opened a new pull request, #14848:
URL: https://github.com/apache/kafka/pull/14848

   Move the following metrics from Yammer to Kafka metrics to continue with the 
migration from yammer to kafka metrics. These are not exposed and newly created 
metrics so there are no compatibility issues.
   
* NumConsumerGroups (yammer) -> consumer-groups-size (kafka)
* NumConsumerGroupsEmpty (yammer) -> empty-consumer-groups-size (kafka)
* NumConsumerGroupsAssigning (yammer) -> assigning-consumer-groups-size 
(kafka)
* NumConsumerGroupsReconciling (yammer) -> reconciling-consumer-groups-size 
(kafka)
* NumConsumerGroupsStable (yammer) -> stable-consumer-groups-size (kafka)
* NumConsumerGroupsDead (yammer) -> dead-consumer-groups-size (kafka)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406925061


##
metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.server.common.MetadataVersion;
+import org.mockito.internal.util.MockUtil;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class MetadataFeatureUtil {

Review Comment:
   I agree, it is getting a bit annoying to have to mock MetadataVersion 
everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406924620


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int 
partitionId, ImageWriterO
 record.setDirectories(Uuid.toList(directories));
 } else {
 for (Uuid directory : directories) {
-if (!DirectoryId.UNASSIGNED.equals(directory)) {
+if (!DirectoryId.UNASSIGNED.equals(directory) && 
!DirectoryId.MIGRATING.equals(directory)) {

Review Comment:
   Just trying to be true to the semantics. If we end up downgrading metadata 
but all we have is `UNASSIGNED`, there isn't really any metadata loss there.
   
   I don't feel strongly about this, so let me know if you think this is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406913694


##
metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.Uuid;
+
+/**
+ * Provide the default directory for new partitions in a given broker.
+ */
+@FunctionalInterface
+public interface DefaultDirProvider {
+Uuid defaultDir(int brokerId);

Review Comment:
   I'll add documentation. But I don't think this should ever return 
`MIGRATING`, we simply do not call this provider unless the MetadataVersion 
supports direcotry assignments. The other result this might have is the single 
dir for a broker that supports JBOD but is registered with a single directory. 
Essentially this is so we can preset directory assignment to the only 
registered directory in a broker for new partitions or reassignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1406906907


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   It's true in theory, but in this unit test, it is not.
   
   I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of 
the runs was missing the last directory. On my laptop, with 7 heartbeats, 
10,000 test runs had no failures.
   
   I think this happens because of how `poll()` works: it's rapidly advancing 
the clock and notifying three separate systems - BrokerLifecycleManager, 
MockClient and MockChannelManager — signaling them using `Object.notify()`, 
which does not guarantee each of those threads will run straightaway, it's 
still up to the OS to schedule them onto the CPU.
   
   In practice, outside of unit tests, the delays in scheduling the 
BrokerLifecycleManager thread should be insignificant compared to the heartbeat 
interval. So I don't expect failures to be delayed for more than a heartbeat. 
   
   If the test proves to still be flaky even with 10 heartbeats, I think we can 
just increase the number. 



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   It's true in theory, but in this unit test, it is not.
   
   I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of 
the runs was missing the last directory. On my laptop, with 7 heartbeats, 
10,000 test runs had no failures.
   
   I think this happens because of how `poll()` works: it's rapidly advancing 
the clock and notifying three separate systems - BrokerLifecycleManager, 
MockClient and MockChannelManager — signaling them using `Object.notify()`, 
which does not guarantee each of those threads will run straightaway, it's 
still up to the OS to schedule them onto the CPU.
   
   In practice, outside of unit tests, the delays in scheduling the 
BrokerLifecycleManager thread should be insignificant compared to the heartbeat 
interval. So I don't expect failures to be delayed for more than a heartbeat. 
   
   If the test proves to still be flaky even with 10 heartbeats, I think we can 
just increase the number. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]

2023-11-27 Thread via GitHub


kirktrue commented on PR #14768:
URL: https://github.com/apache/kafka/pull/14768#issuecomment-1828804792

   @mjsax would you be willing to review this PR? It's small and (hopefully!) 
straightforward. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-11-27 Thread via GitHub


ocadaruma commented on PR #14242:
URL: https://github.com/apache/kafka/pull/14242#issuecomment-1828798534

   @junrao Oh I misinterpreted as all green with only checking pipeline-view 
but I had to check tests view.
   
   I checked. Seems none of them are related to this change, and failures are 
due to the flakiness because all failed tests still succeeded on at least some 
JDK build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


junrao commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1406880863


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   Thanks for the explanation, @soarez.  There is a heartbeat request after the 
initial registration. Each `manager.propagateDirectoryFailure` could trigger a 
separate heartbeat request. So, is it true that after the 4th heartbeat, each 
heart request is guaranteed to include all three failed dirs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15908:
--
Fix Version/s: 4.0.0

> Remove deprecated Consumer API poll(long timeout)
> -
>
> Key: KAFKA-15908
> URL: https://issues.apache.org/jira/browse/KAFKA-15908
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per 
> [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior],
>  the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14438:
--
Fix Version/s: 3.7.0
   (was: 4.0.0)

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15907:
--
Fix Version/s: 4.0.0

> Remove previously deprecated Consumer features from 4.0
> ---
>
> Key: KAFKA-15907
> URL: https://issues.apache.org/jira/browse/KAFKA-15907
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> This Jira serves as the main collection of APIs, logic, etc. that were 
> previously marked as "deprecated" by other KIPs. With 4.0, we will be 
> updating the code to remove the deprecated features.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15909) Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15909:
--
Summary: Throw error when consumer configured with empty/whitespace-only 
group.id for LegacyKafkaConsumer  (was: Remove support for empty "group.id" for 
"generic" group protocol)

> Throw error when consumer configured with empty/whitespace-only group.id for 
> LegacyKafkaConsumer
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer],
>  the use of an empty value for {{group.id}} configuration was deprecated back 
> in 2.2.0.
> In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
> KAFKA-14438).
> This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
> error in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14438:
--
Summary: Throw error when consumer configured with empty/whitespace-only 
group.id for AsyncKafkaConsumer  (was: Throw error when consumer configured 
with empty/whitespace-only group.id)

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 4.0.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15909:
--
Description: 
Per 
[KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer],
 the use of an empty value for {{group.id}} configuration was deprecated back 
in 2.2.0.

In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
KAFKA-14438).

This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
error in 4.0.

  was:
Per KIP-289, the use of an empty value for {{group.id}} configuration was 
deprecated back in 2.2.0.

In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
KAFKA-14438).

This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
error in 4.0.


> Remove support for empty "group.id" for "generic" group protocol
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer],
>  the use of an empty value for {{group.id}} configuration was deprecated back 
> in 2.2.0.
> In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
> KAFKA-14438).
> This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
> error in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406862990


##
metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.server.common.MetadataVersion;
+import org.mockito.internal.util.MockUtil;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class MetadataFeatureUtil {

Review Comment:
   I've been thinking about this more, and I think maybe we should just claim a 
metadata version number. It's time.
   
   Maybe even move KIP-966 up a notch since we may finish before they do. I 
guess let's discuss offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15909:
--
Description: 
Per KIP-289, the use of an empty value for {{group.id}} configuration was 
deprecated back in 2.2.0.

In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
KAFKA-14438).

This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
error in 4.0.

  was:
Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 
2.0.0. 

In 3.7, there are two implementations, each with different behavior:

* The {{LegacyKafkaConsumer}} implementation will continue to work but will log 
a warning about its removal
* The {{AsyncKafkaConsumer}} implementation will throw an error.

In 4.0, the `poll` method that takes a single `long` timeout will be removed 
altogether.


> Remove support for empty "group.id" for "generic" group protocol
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per KIP-289, the use of an empty value for {{group.id}} configuration was 
> deprecated back in 2.2.0.
> In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
> KAFKA-14438).
> This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
> error in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1406861254


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest {
 ctx.controllerNodeProvider.node.set(controllerNode)
 
 val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
-
 manager.start(() => ctx.highestMetadataOffset.get(),
   ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
-poll(ctx, manager, heartbeats(0)).data()
-val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
-
 
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-poll(ctx, manager, heartbeats(2)).data()
-val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
-
 
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
-poll(ctx, manager, heartbeats(4)).data()
-val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()
-
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), 
dirs1.asScala.toSet)
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", 
"ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet)
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", 
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet)
+val latestHeartbeat = Seq.fill(10)(
+  prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+).map(poll(ctx, manager, _)).last

Review Comment:
   I need to update the KIP. Last week in a disucssion with @cmccabe and 
@pprovenzano, we realized that because of overload mode for heartbeats, it will 
be easier to handle failed log directories if the broker always sends the 
accumulated list. Hence #14770



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406861113


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int 
partitionId, ImageWriterO
 record.setDirectories(Uuid.toList(directories));
 } else {
 for (Uuid directory : directories) {
-if (!DirectoryId.UNASSIGNED.equals(directory)) {
+if (!DirectoryId.UNASSIGNED.equals(directory) && 
!DirectoryId.MIGRATING.equals(directory)) {

Review Comment:
   Hmm. Why do we need to special-case UNASSIGNED here? If we're pre-JBOD, we 
should only have MIGRATING, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406859690


##
metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.Uuid;
+
+/**
+ * Provide the default directory for new partitions in a given broker.
+ */
+@FunctionalInterface
+public interface DefaultDirProvider {
+Uuid defaultDir(int brokerId);

Review Comment:
   So for a broker in JBOD mode (i.e. it has multiple directories), 
`defaultDir` would return `UNASSIGNED`, right? And when we're using an older 
MetadataVersion, we'll always get back `MIGRATING`, right?
   
   It would be good to document that in the JavaDoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406858333


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() {
 targetAdding = Collections.emptyList();
 }
 
-public Optional build() {
+public Optional build(DefaultDirProvider 
defaultDirProvider) {

Review Comment:
   Please don't add arguments to build(). If you want to have a way to set the 
default directory, then have a `Builder.setDefaultDirProvider` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406858333


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() {
 targetAdding = Collections.emptyList();
 }
 
-public Optional build() {
+public Optional build(DefaultDirProvider 
defaultDirProvider) {

Review Comment:
   Please don't add arguments to build(). If you want to have a way to set the 
default directory, then have a `setDefaultDirProvider` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14820:
URL: https://github.com/apache/kafka/pull/14820#discussion_r1406856346


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -123,6 +127,7 @@ public PartitionChangeBuilder(
 this.targetElr = Replicas.toList(partition.elr);
 this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
 this.targetLeaderRecoveryState = partition.leaderRecoveryState;
+this.targetDirectories = 
DirectoryId.createAssignmentMap(partition.replicas, partition.directories);

Review Comment:
   One thing I'm trying to understand is why we should use a map here rather 
than a list of the same length as the replicas list. This also applies to other 
uses of `createAssignmentMap` -- is there ever a place where we would be better 
off with a map rather than just an array of the right size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


soarez commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1406855449


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   No. This can mitigate it, but it cannot prevent the race condition entirely.
   
   AssignmentsManager has its own event loop thread that is batching and 
sending the accumulated failed directories. It's a bit tricky to predict the 
content of each request, so instead I opted to only assert after a few 
heartbeats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-27 Thread via GitHub


AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406853731


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
 this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
 }
 }
+
+/**
+ * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+ * the protocol, but subsequent requests do not send information which has 
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+static class HeartbeatState {
+private final SubscriptionState subscriptions;
+private final MembershipManager membershipManager;
+private final int rebalanceTimeoutMs;
+
+// Fields of ConsumerHeartbeatRequest sent in the most recent request
+private String sentInstanceId;
+private int sentRebalanceTimeoutMs;
+private TreeSet sentSubscribedTopicNames;
+// private String sentSubscribedTopicRegex;
+private String sentServerAssignor;
+private TreeSet sentTopicPartitions;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15909:
--
Summary: Remove support for empty "group.id" for "generic" group protocol  
(was: Remove support for empty "group.id")

> Remove support for empty "group.id" for "generic" group protocol
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back 
> in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id"

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15909:
--
Fix Version/s: 4.0.0

> Remove support for empty "group.id"
> ---
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back 
> in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15908:
--
Description: 
Per 
[KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior],
 the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. 

In 3.7, there are two implementations, each with different behavior:

* The {{LegacyKafkaConsumer}} implementation will continue to work but will log 
a warning about its removal
* The {{AsyncKafkaConsumer}} implementation will throw an error.

In 4.0, the `poll` method that takes a single `long` timeout will be removed 
altogether.

  was:
Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 
2.0.0. 

In 3.7, there are two implementations, each with different behavior:

* The {{LegacyKafkaConsumer}} implementation will continue to work but will log 
a warning about its removal
* The {{AsyncKafkaConsumer}} implementation will throw an error.

In 4.0, the `poll` method that takes a single `long` timeout will be removed 
altogether.


> Remove deprecated Consumer API poll(long timeout)
> -
>
> Key: KAFKA-15908
> URL: https://issues.apache.org/jira/browse/KAFKA-15908
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Per 
> [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior],
>  the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15909) Remove support for empty "group.id"

2023-11-27 Thread Kirk True (Jira)
Kirk True created KAFKA-15909:
-

 Summary: Remove support for empty "group.id"
 Key: KAFKA-15909
 URL: https://issues.apache.org/jira/browse/KAFKA-15909
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 
2.0.0. 

In 3.7, there are two implementations, each with different behavior:

* The {{LegacyKafkaConsumer}} implementation will continue to work but will log 
a warning about its removal
* The {{AsyncKafkaConsumer}} implementation will throw an error.

In 4.0, the `poll` method that takes a single `long` timeout will be removed 
altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15908:
--
Component/s: clients
 consumer

> Remove deprecated Consumer API poll(long timeout)
> -
>
> Key: KAFKA-15908
> URL: https://issues.apache.org/jira/browse/KAFKA-15908
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back 
> in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2023-11-27 Thread Kirk True (Jira)
Kirk True created KAFKA-15908:
-

 Summary: Remove deprecated Consumer API poll(long timeout)
 Key: KAFKA-15908
 URL: https://issues.apache.org/jira/browse/KAFKA-15908
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kirk True
Assignee: Kirk True


Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 
2.0.0. 

In 3.7, there are two implementations, each with different behavior:

* The {{LegacyKafkaConsumer}} implementation will continue to work but will log 
a warning about its removal
* The {{AsyncKafkaConsumer}} implementation will throw an error.

In 4.0, the `poll` method that takes a single `long` timeout will be removed 
altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on PR #14820:
URL: https://github.com/apache/kafka/pull/14820#issuecomment-1828735267

   @soarez : `BrokerLifecycleManagerTest.testAlwaysSendsAccumulatedOfflineDirs` 
seems to be failing -- can you update this with the latest from trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest

2023-11-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15896:

Labels: flaky-test  (was: )

> Flaky test: shouldQuerySpecificStalePartitionStores() – 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest
> --
>
> Key: KAFKA-15896
> URL: https://issues.apache.org/jira/browse/KAFKA-15896
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> Flaky test: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
>  
> {code:java}
> Error
> org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
> specified partition 1 for store source-table does not 
> exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException:
>  The specified partition 1 for store source-table does not exist.  at 
> app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
> at 
> app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
>  at 
> app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347)
>   at 
> java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest

2023-11-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15896:

Component/s: streams
 unit tests

> Flaky test: shouldQuerySpecificStalePartitionStores() – 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest
> --
>
> Key: KAFKA-15896
> URL: https://issues.apache.org/jira/browse/KAFKA-15896
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>
> Flaky test: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
>  
> {code:java}
> Error
> org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
> specified partition 1 for store source-table does not 
> exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException:
>  The specified partition 1 for store source-table does not exist.  at 
> app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
> at 
> app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
>  at 
> app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347)
>   at 
> java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406768273


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);

Review Comment:
   Since this is not set to the fetch timeout maybe we can call this 
`checkQuorumTimeoutMs` and `checkQuorumTimer`. I am suggesting these names 
because @ahuang98 uses "check quorum" in the pre-vote KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+// the voters within fetch timeout.

Review Comment:
   Can we make this a Java doc comments. E.g.:
   ```java
   /**
* Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from
* the majority of the voters within fetch timeout.
*
* @param ...
* @return ...
*/
   ```



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 
 UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
 
+Optional> state = quorum.maybeLeaderState();
+state.ifPresent(s -> 
s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs));
+

Review Comment:
   Since the check above `leaderValidation.isPresent()` is false, it means that 
this replica is guarantee to be the leader at this point in time. I prefer if 
we use `leaderStateOrThrow` instead of `maybeLeaderState` to make this clear to 
future readers.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,39 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+// use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+// the voters within fetch timeout.
+public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+fetchTimer.update(currentTimeMs);
+boolean isExpired = fetchTimer.isExpired();
+if (isExpired) {
+log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+fetchTimeoutMs, fetchedVoters);
+}
+return isExpired;
+}
+
+// Reset the fetch timer if we've received fetch/fetchSnapshot request 
from the majority of the voter
+public void maybeResetMajorityFollowerFetchTimer(int id, long 
currentTimeMs) {
+updateFetchedVoters(id);
+// The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
+int majority = voterStates.size() / 2;
+if (fetchedVoters.size() >= majority) {
+fetchedVoters.clear();
+fetchTimer.update(currentTimeMs);
+fetchTimer.reset(fetchTimeoutMs);
+}
+}
+
+private void updateFetchedVoters(int id) {
+if (isVoter(id)) {

Review Comment:
   We should be defensive against this getting called with the local replica. 
Let's throw an `IllegalArgumentException` if `id` is equal to the `localId`.



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -485,6 +485,49 @@ public void 
testHandleBeginQuorumEpo

[jira] [Updated] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0

2023-11-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15907:
--
Component/s: clients
 consumer

> Remove previously deprecated Consumer features from 4.0
> ---
>
> Key: KAFKA-15907
> URL: https://issues.apache.org/jira/browse/KAFKA-15907
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> This Jira serves as the main collection of APIs, logic, etc. that were 
> previously marked as "deprecated" by other KIPs. With 4.0, we will be 
> updating the code to remove the deprecated features.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0

2023-11-27 Thread Kirk True (Jira)
Kirk True created KAFKA-15907:
-

 Summary: Remove previously deprecated Consumer features from 4.0
 Key: KAFKA-15907
 URL: https://issues.apache.org/jira/browse/KAFKA-15907
 Project: Kafka
  Issue Type: Task
Reporter: Kirk True
Assignee: Kirk True


This Jira serves as the main collection of APIs, logic, etc. that were 
previously marked as "deprecated" by other KIPs. With 4.0, we will be updating 
the code to remove the deprecated features.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-11-27 Thread via GitHub


AndrewJSchofield commented on PR #14811:
URL: https://github.com/apache/kafka/pull/14811#issuecomment-1828718746

   Build is almost green. A small number of test failures unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


junrao commented on code in PR #14836:
URL: https://github.com/apache/kafka/pull/14836#discussion_r1406820475


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
 while (!future.isDone || context.mockClient.hasInFlightRequests) {
   context.poll()
   manager.eventQueue.wakeup()
-  context.time.sleep(100)
+  context.time.sleep(5)

Review Comment:
   What's causing the following failure before? Does this change fix the issue?
   
   ```
   rg.opentest4j.AssertionFailedError: 
   Expected :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow, 
1iF76HVNRPqC7Y4r6647eg)
   Actual   :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow)
   ```



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest {
 ctx.controllerNodeProvider.node.set(controllerNode)
 
 val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
-
 manager.start(() => ctx.highestMetadataOffset.get(),
   ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
-poll(ctx, manager, heartbeats(0)).data()
-val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
-
 
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-poll(ctx, manager, heartbeats(2)).data()
-val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
-
 
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
-poll(ctx, manager, heartbeats(4)).data()
-val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()
-
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), 
dirs1.asScala.toSet)
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", 
"ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet)
-assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", 
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet)
+val latestHeartbeat = Seq.fill(10)(
+  prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+).map(poll(ctx, manager, _)).last

Review Comment:
   Hmm, KIP-858 says "The UUIDs for the newly failed log directories are 
included in the BrokerHeartbeat request until the broker receives a successful 
response.".  How do we guarantee that only the 10th HeartbeatRequest picks up 
the failed log dirs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-27 Thread via GitHub


cmccabe merged PR #14836:
URL: https://github.com/apache/kafka/pull/14836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1828686697

   Hi @lucasbru - I just opened the PR for you to review.  I'm not 100% happy 
with the way tests are setup therefore I made some changes around optionally 
disabling autocommit in the network thread.  Also, I feel the tests here kind 
of become some sort of integration testing.  I thought it kind of against the 
unit test philosophy.  In summary, the changes are:
   1. We will only auto commit if the configuration is enabled (by default) or 
if we've got anything to commit at all
   2. We need to enforce finding a coordinator and send an autocommit 
regardless of the previous commit state because we need to make sure to record 
the progress before closing
   3. Quite a bit of changes to the testing, because autocommit depends on the 
current progress, so I need to "seek" for some cases to ensure the test sends 
an autocommit
   
   LMK what do you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on PR #14546:
URL: https://github.com/apache/kafka/pull/14546#issuecomment-1828684914

   Thanks for the PR, @ocadaruma .
   
   There is a PR we've been working on for a while, 
https://github.com/apache/kafka/pull/14160/files , which I think includes your 
fix and much more. I will see if I can get that one in since it corrects a 
bunch of things which are now out-of-date.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406792003


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java:
##
@@ -279,7 +287,7 @@ public 
ConsumerNetworkThreadTestBuilder(Optional groupInfo) {
 
 @Override
 public void close() {
-closeQuietly(consumerNetworkThread, 
ConsumerNetworkThread.class.getSimpleName());
+consumerNetworkThread.close();

Review Comment:
   I don't think we should suppress the failures on closing during testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406789593


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -206,21 +206,21 @@ public CompletableFuture maybeAutoCommit(final 
Map maybeAutoCommitAllConsumed() {
+public CompletableFuture maybeAutoCommit() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {
+return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
+}
+
 /**
- * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+ * Return an OffsetCommitRequest of all assigned topicPartitions and their 
current positions.
  */
-Optional 
maybeCreateAutoCommitRequest() {
-if (!autoCommitState.isPresent()) {
-return Optional.empty();
-}
-
+NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {
 OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
 
request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed()));
-return Optional.of(request.toUnsentRequest());
+return request.toUnsentRequest();

Review Comment:
   We should always return a request because I moved that check out of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-11-27 Thread via GitHub


hachikuji commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1406789222


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,

Review Comment:
   Yeah, the original name seems fine to me. We are still loading the 
transactions. We just have an optimization when we already had state from a 
previous epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15860) ControllerRegistration must be written out to the metadata image

2023-11-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15860.
--
Fix Version/s: 3.7.0
   Resolution: Fixed

> ControllerRegistration must be written out to the metadata image
> 
>
> Key: KAFKA-15860
> URL: https://issues.apache.org/jira/browse/KAFKA-15860
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   pretty terrible naming because it kind of overlaps with the one below, not 
sure there's a better description for it.  It needs to check 1. if autocommit 
is enabled and 2. if there's anything to commit.  If neither, then we don't try 
to send a commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent())
 return EMPTY;
 
-maybeAutoCommitAllConsumed();
+maybeAutoCommit();

Review Comment:
   we always commit allConsumed(), so there's no point to reinstate that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   pretty terrible words, not sure there's a better description for it.  It 
needs to check 1. if autocommit is enabled and 2. if there's anything to 
commit.  If neither, then we don't try to send a commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14160:
URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786870


##
docs/ops.html:
##
@@ -3778,6 +3784,14 @@ Migrating brokers to KRaft
 Each broker is restarted with a KRaft configuration until the entire 
cluster is running in KRaft mode.
   
 
+  Reverting to ZooKeeper mode During the Migration
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode. In order to do this:
+
+  One by one, take each KRaft broker down. Remove the 
__cluster_metadata directory on the broker. Then, restart the broker as 
ZooKeeper.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14160:
URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786425


##
docs/ops.html:
##
@@ -3603,39 +3603,45 @@ 
 Supporting JBOD configurations with multiple storage directories
 Modifying certain dynamic configurations on the standalone KRaft 
controller
-Delegation tokens
   
 
   ZooKeeper to KRaft 
Migration
 
   
 ZooKeeper to KRaft migration is considered an Early Access feature and 
is not recommended for production clusters.
+Please report issues with ZooKeeper to KRaft migration using the
+https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
   
 
-  The following features are not yet supported for ZK to KRaft 
migrations:
-
+  Terminology
   
-Downgrading to ZooKeeper mode during or after the migration
-Other features not yet supported in 
KRaft
+Brokers that are in ZK mode store their metadata in Apache 
ZooKepeer. This is the old mode of handling metadata.
+Brokers that are in KRaft mode store their metadata in a KRaft 
quorum. This is the new and improve mode of handling metadata.
+Migration is the process of moving cluster metadata from 
ZooKeeper into a KRaft quorum.
   
 
-  
-Please report issues with ZooKeeper to KRaft migration using the
-https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
-  
+  Migration Phases
+  In general, the migration process passes through several phases.
 
-  Terminology
-  
-We use the term "migration" here to refer to the process of changing a 
Kafka cluster's metadata
-system from ZooKeeper to KRaft and migrating existing metadata. An 
"upgrade" refers to installing a newer version of Kafka. It is not recommended 
to
-upgrade the software at the same time as performing a metadata migration.
-  
+  
+In the initial phase, all the brokers are in ZK mode, and there 
is a ZK-based controller.
+During the initial metadata load, a KRaft quorum loads the 
metadata from ZooKeeper,
+In hybrid phase, some brokers are in ZK mode, but there is a 
KRaft controller.
+In dual-write phase, all brokers are KRaft, but the KRaft 
controller is continuing to write to ZK.
+When the migration has been finalized, we no longer write 
metadata to ZooKeeper.
+  
 
-  
-We also use the term "ZK mode" to refer to Kafka brokers which are using 
ZooKeeper as their metadata
-system. "KRaft mode" refers Kafka brokers which are using a KRaft 
controller quorum as their metadata system.
-  
+  Limitations
+  
+While a cluster is being migrated from ZK mode to KRaft mode, we do 
not support changing the metadata
+  version (also known as the inter.broker.protocol version.) 
Please do not attempt to do this during
+  a migration, or you may break the cluster.
+After the migration has been finalized, it is not possible to revert 
back to ZooKeeper mode.
+As noted above, some features are not 
fully implemented in KRaft mode. If you are
+  using one of those features, you will not be able to migrate to KRaft 
yet.
+  
 
+  

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-27 Thread via GitHub


cmccabe commented on code in PR #14160:
URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786099


##
docs/ops.html:
##
@@ -3603,39 +3603,45 @@ 
 Supporting JBOD configurations with multiple storage directories
 Modifying certain dynamic configurations on the standalone KRaft 
controller
-Delegation tokens
   
 
   ZooKeeper to KRaft 
Migration
 
   
 ZooKeeper to KRaft migration is considered an Early Access feature and 
is not recommended for production clusters.
+Please report issues with ZooKeeper to KRaft migration using the
+https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
   
 
-  The following features are not yet supported for ZK to KRaft 
migrations:
-
+  Terminology
   
-Downgrading to ZooKeeper mode during or after the migration
-Other features not yet supported in 
KRaft
+Brokers that are in ZK mode store their metadata in Apache 
ZooKepeer. This is the old mode of handling metadata.
+Brokers that are in KRaft mode store their metadata in a KRaft 
quorum. This is the new and improve mode of handling metadata.
+Migration is the process of moving cluster metadata from 
ZooKeeper into a KRaft quorum.
   
 
-  
-Please report issues with ZooKeeper to KRaft migration using the
-https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
-  
+  Migration Phases
+  In general, the migration process passes through several phases.
 
-  Terminology
-  
-We use the term "migration" here to refer to the process of changing a 
Kafka cluster's metadata
-system from ZooKeeper to KRaft and migrating existing metadata. An 
"upgrade" refers to installing a newer version of Kafka. It is not recommended 
to
-upgrade the software at the same time as performing a metadata migration.
-  
+  
+In the initial phase, all the brokers are in ZK mode, and there 
is a ZK-based controller.
+During the initial metadata load, a KRaft quorum loads the 
metadata from ZooKeeper,
+In hybrid phase, some brokers are in ZK mode, but there is a 
KRaft controller.
+In dual-write phase, all brokers are KRaft, but the KRaft 
controller is continuing to write to ZK.
+When the migration has been finalized, we no longer write 
metadata to ZooKeeper.
+  

Review Comment:
   I think distinguishing "hybrid phase" from "dual-write phase" is useful, 
even though they both map to the same `MigrationState` enum.
   
   "migration phases" aren't "migration states" (and actually, we don't discuss 
migration states in this doc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15819: Fix leaked KafkaRaftManager in ZK mode during migration [kafka]

2023-11-27 Thread via GitHub


cmccabe merged PR #14751:
URL: https://github.com/apache/kafka/pull/14751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename method sendBrokerHeartbeat [kafka]

2023-11-27 Thread via GitHub


cmccabe merged PR #14658:
URL: https://github.com/apache/kafka/pull/14658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]

2023-11-27 Thread via GitHub


hanyuzheng7 closed pull request #14821: KAFKA-15768: 
StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
URL: https://github.com/apache/kafka/pull/14821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-27 Thread via GitHub


hanyuzheng7 commented on PR #14570:
URL: https://github.com/apache/kafka/pull/14570#issuecomment-1828621306

   @mjsax ready for final code review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-27 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1406752014


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-27 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1406751704


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1632,10 +1729,65 @@ public  void shouldHandleRangeQuery(
 IllegalArgumentException.class,
 queryResult.get(partition)::getFailureMessage
 );
-
 try (final KeyValueIterator iterator = 
queryResult.get(partition).getResult()) {
 while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+actualValues.add((Integer) iterator.next().value);
+}
+}
+assertThat(queryResult.get(partition).getExecutionInfo(), 
is(empty()));
+}
+assertThat("Result:" + result, actualValues, is(expectedValues));
+assertThat("Result:" + result, result.getPosition(), 
is(INPUT_POSITION));
+}
+}
+
+public  void shouldHandleTimestampedRangeQuery(
+final Optional lower,
+final Optional upper,
+final boolean isKeyAscending,
+final List expectedValues) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15456: Client support for OffsetFetch/OffsetCommit v9 [kafka]

2023-11-27 Thread via GitHub


kirktrue commented on PR #14557:
URL: https://github.com/apache/kafka/pull/14557#issuecomment-1828563894

   Thanks for the PR, @lianetm. This is tricky stuff, to be sure! My feedback 
is mostly minor. Hopefully I can do another pass in a day or so with more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15456: Client support for OffsetFetch/OffsetCommit v9 [kafka]

2023-11-27 Thread via GitHub


kirktrue commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1406678397


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -161,16 +172,16 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 List requests = 
pendingRequests.drain(currentTimeMs);
 // min of the remainingBackoffMs of all the request that are still 
backing off
 final long timeUntilNextPoll = Math.min(
-findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
+findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+findMinTime(unsentOffsetFetchRequests(), currentTimeMs));

Review Comment:
   Nit: we can leave the indentation as is.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener to register for getting notified when the member state changes, or 
new member ID or
+ * epoch are received.
+ */
+public interface MemberStateListener {
+
+/**
+ * Called when the member transitions to a new state.
+ *
+ * @param state New state.
+ */
+void onStateChange(MemberState state);
+
+/**
+ * Called when the member receives a new member ID.
+ *
+ * @param memberId New member ID.
+ * @param epochLatest member epoch received.
+ */
+void onMemberIdUpdated(String memberId, int epoch);
+
+/**
+ * Called when a member receives a new member epoch.
+ *
+ * @param epochNew member epoch.
+ * @param memberId Current member ID.
+ */
+void onMemberEpochUpdated(int epoch, String memberId);

Review Comment:
   I'm wondering why this interface can't be a single `onUpdate(MemberState 
state)` and leave it up to the callbacks to determine what's changed? 🤔



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
 future.completeExceptionally(new CommitFailedException());
 break;
 case UNKNOWN_MEMBER_ID:
-log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-future.completeExceptionally(error.exception());
+log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+handleUnknownMemberIdError(this);
+break;
+case STALE_MEMBER_EPOCH:
+log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+handleStaleMemberEpochError(this);
 break;
 default:
-future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+" " + error.message()));
 break;
 }
 }
+
+@Override
+void abortRetry(String cause) {
+future.completeExceptionally(new KafkaException("Offset commit 
waiting for new member" +
+" ID or epoch cannot be retried. " + cause));
+}
+
+/**
+ * Reset timers and add request to the list of pending requests, to 
make sure it is sent
+ * out on the next poll iteration, without applying any backoff.
+ */
+@Override
+public void retryOnMemberIdOrEpochUpdate(Optional memberId,
+ Optional 
memberEpoch) {
+this.memberId = memberId;
+this.memberEpoch = memberEpoch;
+reset();
+pendingRequests.addOffsetCommitRequest(this);
+}
+
+@Override
+public String requestName() 

Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406699542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,22 +270,40 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().autoCommitEnabled()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+Optional autocommit = 
requestManagers.commitRequestManager.get().maybeCreateAutoCommitRequest();
+if (!autocommit.isPresent()) {
+return;
+}
+
+List autocommitRequest = 
Collections.singletonList(autocommit.get());
+networkClientDelegate.addAll(autocommitRequest);
 do {
 long currentTimeMs = timer.currentTimeMs();
 ensureCoordinatorReady(timer);
 networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
-} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+} while (timer.notExpired() && 
!autocommitRequest.get(0).future().isDone());
+}
+
+void maybeLeaveGroup(final Timer timer) {
+// TODO: Leave group upon closing the consumer

Review Comment:
   Can I follow up with a subsequent ticket to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-27 Thread via GitHub


AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406699697


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
 this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
 }
 }
+
+/**
+ * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+ * the protocol, but subsequent requests do not send information which has 
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+static class HeartbeatState {

Review Comment:
   Also, spotbugs will object if an inner class is non-static and does not use 
its `this` reference to the outer class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-11-27 Thread via GitHub


jsancio commented on PR #14428:
URL: https://github.com/apache/kafka/pull/14428#issuecomment-1828499581

   Excuse the delays @showuon . I'll review this today and this week!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2023-11-27 Thread via GitHub


MikeEdgar commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1828509146

   Hi @jolshan , please take a look at this PR to modify the exception thrown 
when describing a topic by an unknown topic ID. The CI failures don't appear 
related to the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-11-27 Thread via GitHub


gharris1727 commented on PR #14293:
URL: https://github.com/apache/kafka/pull/14293#issuecomment-1828464698

   Hi @C0urante @yashmayya I'd like to get this into the 3.7.0 release, and 
code freeze is currently planned for Dec 20th. PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-27 Thread via GitHub


AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406686948


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
-// TODO: extract this logic for building the 
ConsumerGroupHeartbeatRequestData to a
-//  stateful builder (HeartbeatState), that will keep the last data 
sent, and determine
-//  the fields that changed and need to be included in the next HB 
(ex. check
-//  subscriptionState changed from last sent to include assignment). 
It should also
-//  ensure that all fields are sent on failure.
-ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
-.setGroupId(membershipManager.groupId())
-.setMemberEpoch(membershipManager.memberEpoch())
-.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
-if (membershipManager.memberId() != null) {
-data.setMemberId(membershipManager.memberId());
-}
-
-membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
-if (this.subscriptions.hasPatternSubscription()) {
-// TODO: Pass the string to the GC if server side regex is used.
-} else {
-data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-List 
topicPartitions =
-
buildTopicPartitionsList(membershipManager.currentAssignment());
-data.setTopicPartitions(topicPartitions);
-}
-
-
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+/**
+ * Returns the delay before the next network request for this request 
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+ */
+@Override
+public long timeUntilNextPoll(long currentTimeMs) {
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+}
 
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new ConsumerGroupHeartbeatRequest.Builder(data),
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),

Review Comment:
   That style is used throughout this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-11-27 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1406686608


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,

Review Comment:
   Ok. 😅 I think I was trying to distinguish the difference between logical and 
physical loading. But maybe that is too specific. Do you think it should just 
keep the original name?



##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,

Review Comment:
   Ok. 😅 I think I was trying to distinguish the difference between logical and 
physical loading. But maybe that is too specific. Do you think it should just 
keep the original name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-27 Thread via GitHub


AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406683119


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
 this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
 }
 }
+
+/**
+ * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+ * the protocol, but subsequent requests do not send information which has 
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+static class HeartbeatState {

Review Comment:
   So that you can construct an independent instance of the class. It doesn't 
need a `this` and it's handy for testing to be able to make instances on demand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-27 Thread via GitHub


kirktrue commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406656043


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -159,11 +159,19 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return EMPTY;
 
 List requests = 
pendingRequests.drain(currentTimeMs);
+return new 
NetworkClientDelegate.PollResult(timeUntilNextPoll(currentTimeMs), requests);
+}
+
+/**
+ * Returns the delay before the next network request for this request 
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+ */
+@Override
+public long timeUntilNextPoll(long currentTimeMs) {
 // min of the remainingBackoffMs of all the request that are still 
backing off
-final long timeUntilNextPoll = Math.min(
-findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
-return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
+return Math.min(
+findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+findMinTime(unsentOffsetFetchRequests(), currentTimeMs));

Review Comment:
   Nit: leaving the indentation as is conforms to the existing 'four-spaces per 
tab' style.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
-// TODO: extract this logic for building the 
ConsumerGroupHeartbeatRequestData to a
-//  stateful builder (HeartbeatState), that will keep the last data 
sent, and determine
-//  the fields that changed and need to be included in the next HB 
(ex. check
-//  subscriptionState changed from last sent to include assignment). 
It should also
-//  ensure that all fields are sent on failure.
-ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
-.setGroupId(membershipManager.groupId())
-.setMemberEpoch(membershipManager.memberEpoch())
-.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
-if (membershipManager.memberId() != null) {
-data.setMemberId(membershipManager.memberId());
-}
-
-membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
-if (this.subscriptions.hasPatternSubscription()) {
-// TODO: Pass the string to the GC if server side regex is used.
-} else {
-data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-List 
topicPartitions =
-
buildTopicPartitionsList(membershipManager.currentAssignment());
-data.setTopicPartitions(topicPartitions);
-}
-
-
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+/**
+ * Returns the delay before the next network request for this request 
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+ */
+@Override
+public long timeUntilNextPoll(long currentTimeMs) {
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+}
 
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new ConsumerGroupHeartbeatRequest.Builder(data),
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),

Review Comment:
   Nit: we got knocked in other reviews for unnecessarily qualifying with 
`this`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
 this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
 }
 }
+
+/**
+ * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+ * the protocol, but subsequent requests do not send information which has 
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+static class HeartbeatState {
+private final SubscriptionState subscriptions;
+private f

Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-11-27 Thread via GitHub


hachikuji commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1406616682


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,

Review Comment:
   nit: The rename seems borderline overkill. I would consider the epoch bump 
part of transaction loading.



##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,
+coordinatorEpoch: 
Int,
+sendTxnMarkers: 
SendTxnMarkersCallback,
+
transactionStateLoaded: Boolean): Unit = {

Review Comment:
   As mentioned above, I don't think we should pass this as an argument.
   
   On a higher level, I'm trying to figure out the safety of this loading 
process. Suppose we have two epoch bumps in quick succession. Do we get a 
strong ordering guarantee given that it is done asynchronously?  I think I 
would expect that we would check for the existence of the partition in 
`loadingPartitions` when we first acquire the write lock below. If it exists, 
then we need to ensure the monotonicity of the epoch. If the entry has a higher 
epoch, then we ignore the call.



##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 info(s"Elected as the txn coordinator for partition $txnTopicPartitionId 
at epoch $coordinatorEpoch")
 // The operations performed during immigration must be resilient to any 
previous errors we saw or partial state we
 // left off during the unloading phase. Ensure we remove all associated 
state for this partition before we continue
-// loading it.
+// loading it. In the case where the state partition is already loaded, we 
want to remove inflight markers with the
+// old epoch.
 
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
 
 // Now load the partition.
-txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, 
coordinatorEpoch,
-  txnMarkerChannelManager.addTxnMarkersToSend)
+
txnManager.maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(txnTopicPartitionId,
 coordinatorEpoch,
+  txnMarkerChannelManager.addTxnMarkersToSend, 
txnManager.txnStateLoaded(txnTopicPartitionId))

Review Comment:
   It's curious that we need to pass the result of `txnStateLoaded`. Couldn't 
`txnManager` figure it out on its own?



##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -524,23 +536,35 @@ class TransactionStateManager(brokerId: Int,
 }
 
 def loadTransactions(startTimeMs: java.lang.Long): Unit = {
-  val schedulerTimeMs = time.milliseconds() - startTimeMs
-  info(s"Loading transaction metadata from $topicPartition at epoch 
$coordinatorEpoch")
-  validateTransactionTopicPartitionCountIsStable()
-
-  val loadedTransactions = loadTransactionMetadata(topicPartition, 
coordinatorEpoch)
-  val endTimeMs = time.milliseconds()
-  val totalLoadingTimeMs = endTimeMs - startTimeMs
-  partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-  info(s"Finished loading ${loadedTransactions.size} transaction metadata 
from $topicPartition in " +
-   

[jira] [Updated] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-11-27 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15372:

Fix Version/s: 3.7.0

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.7.0
>
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15906) Emit offset syncs more often than offset.lag.max for low-throughput/finite partitions

2023-11-27 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15906:

Description: 
Right now, the offset.lag.max configuration limits the number of offset syncs 
are emitted by the MirrorSourceTask, along with a fair rate-limiting semaphore. 
After 100 records have been emitted for a partition, _and_ the semaphore is 
available, an offset sync can be emitted.

For low-volume topics, the `offset.lag.max` default of 100 is much more 
restrictive than the rate-limiting semaphore. For example, a topic which 
mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset 
sync. If the topic is actually finite, the last offset sync will never arrive, 
and the translation will have a persistent lag.

Instead, we can periodically flush the offset syncs for partitions that are 
under the offset.lag.max limit, but have not received an offset sync recently. 
This could be a new configuration, be a hard-coded time, or be based on the 
existing emit.checkpoints.interval.seconds and 
sync.group.offsets.interval.seconds configurations.

 

Alternatively, we could decrease the default `offset.lag.max` value to 0, and 
rely on the fair semaphore to limit the number of syncs emitted for 
high-throughput partitions. The semaphore is not currently configurable, so 
users wanting lower throughput on the offset-syncs topic will still need an 
offset.lag.max > 0.

  was:
Right now, the offset.lag.max configuration limits the number of offset syncs 
are emitted by the MirrorSourceTask, along with a fair rate-limiting semaphore. 
After 100 records have been emitted for a partition, _and_ the semaphore is 
available, an offset sync can be emitted.

For low-volume topics, the `offset.lag.max` default of 100 is much more 
restrictive than the rate-limiting semaphore. For example, a topic which 
mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset 
sync. If the topic is actually finite, the last offset sync will never arrive, 
and the translation will have a persistent lag.

Instead, we can periodically flush the offset syncs for partitions that are 
under the offset.lag.max limit, but have not received an offset sync recently. 
This could be a new configuration, be a hard-coded time, or be based on the 
existing emit.checkpoints.interval.seconds and 
sync.group.offsets.interval.seconds configurations.


> Emit offset syncs more often than offset.lag.max for low-throughput/finite 
> partitions
> -
>
> Key: KAFKA-15906
> URL: https://issues.apache.org/jira/browse/KAFKA-15906
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Priority: Minor
>
> Right now, the offset.lag.max configuration limits the number of offset syncs 
> are emitted by the MirrorSourceTask, along with a fair rate-limiting 
> semaphore. After 100 records have been emitted for a partition, _and_ the 
> semaphore is available, an offset sync can be emitted.
> For low-volume topics, the `offset.lag.max` default of 100 is much more 
> restrictive than the rate-limiting semaphore. For example, a topic which 
> mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset 
> sync. If the topic is actually finite, the last offset sync will never 
> arrive, and the translation will have a persistent lag.
> Instead, we can periodically flush the offset syncs for partitions that are 
> under the offset.lag.max limit, but have not received an offset sync 
> recently. This could be a new configuration, be a hard-coded time, or be 
> based on the existing emit.checkpoints.interval.seconds and 
> sync.group.offsets.interval.seconds configurations.
>  
> Alternatively, we could decrease the default `offset.lag.max` value to 0, and 
> rely on the fair semaphore to limit the number of syncs emitted for 
> high-throughput partitions. The semaphore is not currently configurable, so 
> users wanting lower throughput on the offset-syncs topic will still need an 
> offset.lag.max > 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]

2023-11-27 Thread via GitHub


junrao commented on code in PR #14840:
URL: https://github.com/apache/kafka/pull/14840#discussion_r1406631952


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 }
 
 @Test
-public void testInternalTopicExists() {
+public void testDoesTopicExist() {
 Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
 ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
 try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
-String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+String topic = "test-topic-exist";
+remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new 
Properties(),

Review Comment:
   `createTopic` only waits for the topic metadata to be committed in the 
metadata log, but not waiting for the metadata to be propagated to every 
broker. `topicBasedRlmm().doesTopicExist` calls `describeTopic` and only checks 
the metadata in a least loaded broker. So, it seems that there is no strong 
guarantee that `doesTopicExist` is always true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >