Re: [PR] KAFKA-16787: Remove TRACE level logging from AsyncKafkaConsumer hot path [kafka]

2024-05-16 Thread via GitHub


lucasbru merged PR #15981:
URL: https://github.com/apache/kafka/pull/15981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604418654


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -84,98 +67,28 @@ class CoordinatorPartitionWriterTest {
   }
 
   @Test
-  def testWriteRecords(): Unit = {

Review Comment:
   Right. We have many tests in CoordinatorRuntimeTest doing writes. As we 
fully validate the records now, they cover this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604417628


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -18,40 +18,21 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple interface to write records to Partitions/Logs. It contains the 
minimum
  * required for coordinators.
- *
- * @param  The record type.
  */
-public interface PartitionWriter {
-
-/**
- * Serializer to translate T to bytes.
- *
- * @param  The record type.
- */
-interface Serializer {
-/**
- * Serializes the key of the record.
- */
-byte[] serializeKey(T record);
-
-/**
- * Serializes the value of the record.
- */
-byte[] serializeValue(T record);
-}
+public interface PartitionWriter {
 
 /**
  * Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, List)}}.
+ * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.

Review Comment:
   Intellij reports them as warning. I suppose that we would get warning when 
we generate the javadoc too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16774: Delete flaky test since it is redundant [kafka]

2024-05-16 Thread via GitHub


cadonna merged PR #15978:
URL: https://github.com/apache/kafka/pull/15978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604415305


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));
+
 try {
-// Apply the records to the state machine.
-if (result.replayRecords()) {
-// We compute the offset of the record based 
on the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
-for (int i = 0; i < result.records().size(); 
i++) {
+MemoryRecordsBuilder builder = 
MemoryRecords.builder(

Review Comment:
   The `builder` is used in the above loop (L801) so we need it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604414478


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1046,12 +1118,18 @@ public void run() {
 result
 );
 
-long offset = 
partitionWriter.appendEndTransactionMarker(
+long offset = partitionWriter.append(
 tp,
-producerId,
-producerEpoch,
-coordinatorEpoch,
-result
+VerificationGuard.SENTINEL,
+MemoryRecords.withEndTransactionMarker(
+time.milliseconds(),

Review Comment:
   `withEndTransactionMarker` takes the current time if we don't specify it. 
The reason why I set it explicitly here is to ensure that the mock time is used 
in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-16 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1604412458


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2116,14 +2116,13 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Add future replica log to partition's map if it's not existed
   if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
-val futureLogInPartition = 
futureLocalLogOrException(topicPartition)
 // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
 // replica from source dir to destination dir
 logManager.abortAndPauseCleaning(topicPartition)
-
-futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-  partition.getLeaderEpoch, futureLogInPartition.highWatermark))
   }
+
+  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
+partition.getLeaderEpoch, futureLog.highWatermark))

Review Comment:
   Thanks to the failing test, I found I was wrong. We should always add the 
partition into fetch thread no matter we created the future log or not since 
before `maybeAddLogDirFetchers` is called, the fetchers are all removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-16 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1604409235


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Ah, I know what you're talking about now. The reason it won't fail is 
because the topicId we feed into the partition object is None. So the topicId 
consistency check will always pass because the original topicId is not set. 
I've updated the test and also verify the response has no errors in this 
commit: 
https://github.com/apache/kafka/pull/15951/commits/4a1b76d38effa2233fb7bc062920a2457243fded
 . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14885: fix kafka client connect to the broker that offline from… [kafka]

2024-05-16 Thread via GitHub


Stephan14 closed pull request #13531: KAFKA-14885: fix kafka client connect to 
the broker that offline from…
URL: https://github.com/apache/kafka/pull/13531


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16179; Stop processing requests before stopping publishing [kafka]

2024-05-16 Thread via GitHub


github-actions[bot] commented on PR #15242:
URL: https://github.com/apache/kafka/pull/15242#issuecomment-2116563410

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15104: Fix flaky tests in MetadataQuorumCommandTest [kafka]

2024-05-16 Thread via GitHub


github-actions[bot] commented on PR #15300:
URL: https://github.com/apache/kafka/pull/15300#issuecomment-2116563386

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-16 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2116554582

   @philipnee I got it. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-16 Thread via GitHub


chiacyu commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1604267579


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()

Review Comment:
   I thought `testMetricsActiveAfterReconfiguration()` already test that the 
metrics are not removed after reconfiguration.
   We can removed the `cleanerManager.removeMetrics()` then we should also 
removed 
[this](https://github.com/apache/kafka/blob/fafa3c76dc93f3258b2cea49dfd1dc7a724a213c/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala#L96)
 in the `testRemoveMetricsOnClose()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1604239000


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   @lianetm & @cadonna—please let me know if the following makes sense. I am 
trying to convince myself of this design as much as anyone else 😄...

[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-16 Thread Colt McNealy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847115#comment-17847115
 ] 

Colt McNealy edited comment on KAFKA-12679 at 5/17/24 12:41 AM:


We have pretty much the same issue running `3.7.0` with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks. This is the same whether or not the State Updater is enabled 
via the internal config.

 

Our cluster was completely orzdashed and we couldn't figure out how to "heal" 
it, but we were able to rectify the issue by setting `num.stream.threads=1` and 
restorations started making progress again.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

 

Also, [~lucasbru] 's comment about this being solved in `trunk` might be 
outdated? That is because, if I recall correctly, the State Updater was planned 
to be GA in 3.7.0 at one point and was then backed out. Is that correct?


was (Author: JIRAUSER301663):
We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks. This is the same whether or not the State Updater is enabled 
via the internal config.

 

Our cluster was completely orzdashed and we couldn't figure out how to "heal" 
it, but we were able to rectify the issue by setting `num.stream.threads=1` and 
restorations started making progress again.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

 

Also, [~lucasbru] 's comment about this being solved in `trunk` might be 
outdated? That is because, if I recall correctly, the State Updater was planned 
to be GA in 3.7.0 at one point and was then backed out. Is that correct?

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-16 Thread Colt McNealy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847115#comment-17847115
 ] 

Colt McNealy edited comment on KAFKA-12679 at 5/17/24 12:13 AM:


We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks. This is the same whether or not the State Updater is enabled 
via the internal config.

 

Our cluster was completely orzdashed and we couldn't figure out how to "heal" 
it, but we were able to rectify the issue by setting `num.stream.threads=1` and 
restorations started making progress again.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

 

Also, [~lucasbru] 's comment about this being solved in `trunk` might be 
outdated? That is because, if I recall correctly, the State Updater was planned 
to be GA in 3.7.0 at one point and was then backed out. Is that correct?


was (Author: JIRAUSER301663):
We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks. This is the same whether or not the State Updater is enabled 
via the internal config.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

 

Also, [~lucasbru] 's comment about this being solved in `trunk` might be 
outdated? That is because, if I recall correctly, the State Updater was planned 
to be GA in 3.7.0 at one point and was then backed out. Is that correct?

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-16 Thread Colt McNealy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847115#comment-17847115
 ] 

Colt McNealy edited comment on KAFKA-12679 at 5/17/24 12:08 AM:


We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks. This is the same whether or not the State Updater is enabled 
via the internal config.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

 

Also, [~lucasbru] 's comment about this being solved in `trunk` might be 
outdated? That is because, if I recall correctly, the State Updater was planned 
to be GA in 3.7.0 at one point and was then backed out. Is that correct?


was (Author: JIRAUSER301663):
We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-16 Thread Colt McNealy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847115#comment-17847115
 ] 

Colt McNealy commented on KAFKA-12679:
--

We have pretty much the same issue when running with 5 stream threads and 
recovering from a mildly unclean shutdown. We get it for both `ACTIVE` and 
`STANDBY` tasks.

 

We also notice that the application makes zero forward progress at all; 
restorations are stuck.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16787: Remove TRACE level logging from AsyncKafkaConsumer hot path [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on PR #15981:
URL: https://github.com/apache/kafka/pull/15981#issuecomment-2116378444

   @lucasbru—can you review this PR? Thanks!
   
   cc @cadonna @lianetm @philipnee


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16787: Remove TRACE level logging from AsyncKafkaConsumer hot path [kafka]

2024-05-16 Thread via GitHub


kirktrue opened a new pull request, #15981:
URL: https://github.com/apache/kafka/pull/15981

   Removed unnecessarily verbose logging.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604173884


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {

Review Comment:
   Hmmm. This was partially due to the evolution of the api. Originally all the 
interfaces here were methods that were used as parameters for the 
FeatureVersion enums. I can try to reorganize this, but I may need some time to 
think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {

Review Comment:
   I can look into adding something. The problem is I don't add any new 
production features in this change. I would add one in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {

Review Comment:
   I can look into adding something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16787) Remove TRACE level logging from AsyncKafkaConsumer hot path

2024-05-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16787:
-

Assignee: Kirk True

> Remove TRACE level logging from AsyncKafkaConsumer hot path
> ---
>
> Key: KAFKA-16787
> URL: https://issues.apache.org/jira/browse/KAFKA-16787
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, logging
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> There are a few places in the new {{AsyncKafkaConsumer}} in which we added 
> TRACE-level logging to common paths (like the 
> {{{}ApplicationEventHandler{}}}). The logging is so overboard that when 
> running system tests, we occasionally run out of disk space on the test 
> instances, causing spurious test failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16787) Remove TRACE level logging from AsyncKafkaConsumer hot path

2024-05-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16787:
--
Description: There are a few places in the new {{AsyncKafkaConsumer}} in 
which we added TRACE-level logging to common paths (like the 
{{{}ApplicationEventHandler{}}}). The logging is so overboard that when running 
system tests, we occasionally run out of disk space on the test instances, 
causing spurious test failures.

> Remove TRACE level logging from AsyncKafkaConsumer hot path
> ---
>
> Key: KAFKA-16787
> URL: https://issues.apache.org/jira/browse/KAFKA-16787
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, logging
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> There are a few places in the new {{AsyncKafkaConsumer}} in which we added 
> TRACE-level logging to common paths (like the 
> {{{}ApplicationEventHandler{}}}). The logging is so overboard that when 
> running system tests, we occasionally run out of disk space on the test 
> instances, causing spurious test failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16787) Remove TRACE level logging from AsyncKafkaConsumer hot path

2024-05-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16787:
-

 Summary: Remove TRACE level logging from AsyncKafkaConsumer hot 
path
 Key: KAFKA-16787
 URL: https://issues.apache.org/jira/browse/KAFKA-16787
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer, logging
Affects Versions: 3.7.0
Reporter: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16787) Remove TRACE level logging from AsyncKafkaConsumer hot path

2024-05-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16787:
--
Labels: consumer-threading-refactor  (was: )

> Remove TRACE level logging from AsyncKafkaConsumer hot path
> ---
>
> Key: KAFKA-16787
> URL: https://issues.apache.org/jira/browse/KAFKA-16787
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, logging
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-16 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1604122125


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+private final TaskId id;
+private final boolean isStateful;
+private final Map> partitionToRackIds;
+private final Set stateStoreNames;
+private final Set inputTopicPartitions;
+private final Set changelogTopicPartitions;
+
+public DefaultTaskInfo(final TaskId id,
+   final boolean isStateful,
+   final Map> 
partitionToRackIds,
+   final Set stateStoreNames,
+   final Set inputTopicPartitions,
+   final Set changelogTopicPartitions) 
{
+this.id = id;
+this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+this.isStateful = isStateful;
+this.stateStoreNames = unmodifiableSet(stateStoreNames);
+this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+}
+
+public static DefaultTaskInfo of(final TaskId taskId,

Review Comment:
   since this is an internal API, you can just have a normal public 
constructor. The static constructor thing is only for public classes where we 
want to make a "nice looking" fluent API 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+private final TaskId id;
+private final boolean isStateful;
+private final Map> partitionToRackIds;
+private final Set stateStoreNames;
+private final Set inputTopicPartitions;
+private final Set changelogTopicPartitions;
+
+public DefaultTaskInfo(final TaskId id,
+   final boolean isStateful,
+   final Map> 
partitionToRackIds,
+   final Set stateStoreNames,
+   final Set inputTopicPartitions,
+   final Set changelogTopicPartiti

[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-16 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847103#comment-17847103
 ] 

Anil Dasari commented on KAFKA-16603:
-

[~ChrisEgerton] Thanks for the details. That helps. Feel free to close the Jira 
if you want to track the change in a different JIRA.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
> force close the producer since pending requests could not be completed within 
> timeout 0 ms.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
> shutdown of Kafka producer I/O thread, sending remaining records.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Abor

[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-05-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847100#comment-17847100
 ] 

Karsten Stöckmann commented on KAFKA-12679:
---

I'm not quite sure if this is somehow related, but I see the following in a 
Kafka Streams application utilizing version {_}3.7.0{_}:
{code:java}
stream-thread 
[kstreams-folder-aggregator-6667c2e7-17f0-43f4-85d3-1246e1deb948-StreamThread-3]
 standby-task [1_4] Failed to acquire lock while closing the state store for 
STANDBY task 1_4{code}

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on PR #15844:
URL: https://github.com/apache/kafka/pull/15844#issuecomment-2116143157

   Thanks for the review, @lianetm. Agreed on all your points. This PR is a 
draft because it's a PoC and more thought and tests are needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on code in PR #15844:
URL: https://github.com/apache/kafka/pull/15844#discussion_r1604004465


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
 if (dupe.isPresent() || inflight.isPresent()) {
-log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+log.info("Duplicate OffsetFetchRequest found for partitions: 
{}", request.requestedPartitions);
+OffsetFetchRequestState originalRequest = 
dupe.orElseGet(inflight::get);
+originalRequest.chainFuture(request.future);
 } else {
 this.unsentOffsetFetches.add(request);
 }
 return request.future;
 }
 
+/**
+ * Remove the {@link OffsetFetchRequestState request} from the 
inflight requests queue iff
+ * both of the following are true:
+ *
+ * 
+ * The request completed with a null {@link 
Throwable error}
+ * The request is not {@link OffsetFetchRequestState#isExpired 
expired}
+ * 
+ *
+ * 
+ *
+ * In some cases, even though an offset fetch request may complete 
without an error, technically
+ * the request took longer than the user's provided timeout. In that 
case, the application thread will
+ * still receive a timeout error, and will shortly try to fetch these 
offsets again. Keeping the result
+ * of the current attempt will enable the next 
attempt to use that result and return

Review Comment:
   Fixed.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
 if (dupe.isPresent() || inflight.isPresent()) {
-log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+log.info("Duplicate OffsetFetchRequest found for partitions: 
{}", request.requestedPartitions);
+OffsetFetchRequestState originalRequest = 
dupe.orElseGet(inflight::get);
+originalRequest.chainFuture(request.future);
 } else {
 this.unsentOffsetFetches.add(request);
 }
 return request.future;
 }
 
+/**
+ * Remove the {@link OffsetFetchRequestState request} from the 
inflight requests queue iff
+ * both of the following are true:
+ *
+ * 
+ * The request completed with a null {@link 
Throwable error}

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603994499


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1846,7 +1849,34 @@ public void 
testPollThrowsInterruptExceptionIfInterrupted() {
 }
 assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
 }
-
+
+@Test
+void testReaperExpiresExpiredEvents() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+final SortedSet partitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+partitions.add(tp);
+
+consumer.subscribe(Collections.singletonList(topicName), new 
CounterConsumerRebalanceListener());
+
+final ConsumerRebalanceListenerCallbackNeededEvent event1 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions);
+final ConsumerRebalanceListenerCallbackNeededEvent event2 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, 
partitions);
+backgroundEventReaper.add(event1);
+backgroundEventQueue.add(event2);
+
+assertEquals(1, backgroundEventReaper.size());
+assertEquals(1, backgroundEventQueue.size());
+
+consumer.close();

Review Comment:
   Added tests for reaper invocation for `close()`, `poll()`, and 
`unsubscribe()`.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1846,7 +1849,34 @@ public void 
testPollThrowsInterruptExceptionIfInterrupted() {
 }
 assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
 }
-
+
+@Test
+void testReaperExpiresExpiredEvents() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+final SortedSet partitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+partitions.add(tp);
+
+consumer.subscribe(Collections.singletonList(topicName), new 
CounterConsumerRebalanceListener());
+
+final ConsumerRebalanceListenerCallbackNeededEvent event1 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions);
+final ConsumerRebalanceListenerCallbackNeededEvent event2 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, 
partitions);
+backgroundEventReaper.add(event1);
+backgroundEventQueue.add(event2);
+
+assertEquals(1, backgroundEventReaper.size());
+assertEquals(1, backgroundEventQueue.size());
+
+consumer.close();
+
+assertEquals(0, backgroundEventReaper.size());
+assertEquals(0, backgroundEventQueue.size());
+assertTrue(event1.future().isCompletedExceptionally());
+assertTrue(event2.future().isCompletedExceptionally());
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603994190


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
 private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
 private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
 private final LinkedBlockingQueue backgroundEventQueue = 
new LinkedBlockingQueue<>();
+private final CompletableEventReaper backgroundEventReaper = new 
CompletableEventReaper(new LogContext());

Review Comment:
   Changed to mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]

2024-05-16 Thread via GitHub


lucasbru commented on PR #15965:
URL: https://github.com/apache/kafka/pull/15965#issuecomment-2116064638

   Thanks for the PR @lianetm . I just left one question


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16675: Refactored and new rebalance callbacks integration tests [kafka]

2024-05-16 Thread via GitHub


lucasbru commented on code in PR #15965:
URL: https://github.com/apache/kafka/pull/15965#discussion_r1603953323


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -84,29 +84,87 @@ class PlaintextConsumerCallbackTest extends 
AbstractConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(quorum: 
String, groupProtocol: String): Unit = {
-val tp = new TopicPartition(topic, 0);
+val tp = new TopicPartition(topic, 0)
 triggerOnPartitionsRevoked { (consumer, _) =>
   val map = consumer.beginningOffsets(Collections.singletonList(tp))
   assertTrue(map.containsKey(tp))
   assertEquals(0, map.get(tp))
 }
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0)
+triggerOnPartitionsAssigned { (consumer, _) => assertDoesNotThrow(() => 
consumer.position(tp)) }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testSeekPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: 
String, groupProtocol: String): Unit = {
+val consumer = createConsumer()
+val startingOffset = 100L
+val totalRecords = 120L
+
+val producer = createProducer()
+val startingTimestamp = 0
+sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
+
+consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
+  override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+consumer.seek(tp, startingOffset)
+  }
+
+  override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+// noop
+  }
+})
+consumeAndVerifyRecords(consumer, numRecords = (totalRecords - 
startingOffset).toInt,
+  startingOffset = startingOffset.toInt, startingKeyAndValueIndex = 
startingOffset.toInt,
+  startingTimestamp = startingOffset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPauseOnPartitionsAssignedCallback(quorum: String, groupProtocol: 
String): Unit = {
+val consumer = createConsumer()
+val totalRecords = 100L
+val partitionsAssigned = new AtomicBoolean(false)
+
+val producer = createProducer()
+val startingTimestamp = 0
+sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
+
+consumer.subscribe(asList(topic), new ConsumerRebalanceListener {

Review Comment:
   Could you not use the helper method below here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group
+ */
+public class CheckpointsStore implements AutoCloseable {
+
+private static final Logger log = 
LoggerFactory.getLogger(CheckpointsStore.class);
+
+private final MirrorCheckpointTaskConfig config;
+private final Set consumerGroups;
+
+private TopicAdmin cpAdmin = null;
+private KafkaBasedLog backingStore = null;
+private Map> 
checkpointsPerConsumerGroup;
+
+private volatile boolean loadSuccess = false;
+private volatile boolean isInitialized = false;
+
+public CheckpointsStore(MirrorCheckpointTaskConfig config, Set 
consumerGroups) {
+this.config = config;
+this.consumerGroups = new HashSet<>(consumerGroups);
+}
+
+// for testing
+CheckpointsStore(Map> 
checkpointsPerConsumerGroup) {
+this.config = null;
+this.consumerGroups = null;
+this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+isInitialized = true;
+loadSuccess =  true;
+}
+
+// potentially long running
+public void start()  {
+checkpointsPerConsumerGroup = readCheckpoints();
+isInitialized = true;
+log.trace("Checkpoints store content : {}", 
checkpointsPerConsumerGroup);
+}
+
+public boolean loadSuccess() {
+return loadSuccess;
+}
+
+public boolean isInitialized() {
+return isInitialized;
+}
+
+
+// return a mutable map - it is expected to be mutated by the Task
+public Map> contents() {

Review Comment:
   Appreciate the feedback. We oscillated forward and backwards on whether to 
break encapsulation or risk `CheckpointStore` increasingly implementing all the 
methods of `Map`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group
+ */
+public class CheckpointsStore implements AutoCloseable {
+
+private static final Logger log = 
LoggerFactory.getLogger(CheckpointsStore.class);
+
+private final MirrorCheckpointTaskConfig config;
+private final Set consumerGroups;
+
+private TopicAdmin cpAdmin = null;
+private KafkaBasedLog backingStore = null;
+private Map> 
checkpointsPerConsumerGroup;
+
+private volatile boolean loadSuccess = false;
+private volatile boolean isInitialized = false;
+
+public CheckpointsStore(MirrorCheckpointTaskConfig config, Set 
consumerGroups) {
+this.config = config;
+this.consumerGroups = new HashSet<>(consumerGroups);
+}
+
+// for testing
+CheckpointsStore(Map> 
checkpointsPerConsumerGroup) {
+this.config = null;
+this.consumerGroups = null;
+this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+isInitialized = true;
+loadSuccess =  true;
+}
+
+// potentially long running
+public void start()  {
+checkpointsPerConsumerGroup = readCheckpoints();
+isInitialized = true;
+log.trace("Checkpoints store content : {}", 
checkpointsPerConsumerGroup);
+}
+
+public boolean loadSuccess() {
+return loadSuccess;
+}
+
+public boolean isInitialized() {
+return isInitialized;
+}
+
+
+// return a mutable map - it is expected to be mutated by the Task
+public Map> contents() {

Review Comment:
   Appreciate the feedback. We oscillated forward and backwards on whether to 
break encapsulation (at the risk of ´CheckpointStore` increasingly implementing 
all the methods of `Map`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-16 Thread via GitHub


lianetm commented on PR #15844:
URL: https://github.com/apache/kafka/pull/15844#issuecomment-2115985840

   FYI, I found another issue that looks to me it's related to this same 
situation: https://issues.apache.org/jira/browse/KAFKA-16777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603893834


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();

Review Comment:
   I had a javadoc that said "the level of the feaure" but didn't know if that 
was silly. I can add something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603861223


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -395,7 +401,7 @@ void syncGroupOffset(String consumerGroupId, 
Map> 
getConvertedUpstreamOffset() {
 Map> result = new 
HashMap<>();
 
-for (Entry> entry : 
checkpointsPerConsumerGroup.entrySet()) {
+for (Entry> entry : 
checkpointsStore.contents().entrySet()) {
 String consumerId = entry.getKey();

Review Comment:
   this getConvertedUpstreamOffset could be moved to CheckpointStore since it 
only depends on the checkpoint store state.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -69,20 +69,22 @@ public class MirrorCheckpointTask extends SourceTask {
 private MirrorCheckpointMetrics metrics;
 private Scheduler scheduler;
 private Map> 
idleConsumerGroupsOffset;
-private Map> 
checkpointsPerConsumerGroup;
+private CheckpointsStore checkpointsStore;
+
 public MirrorCheckpointTask() {}
 
 // for testing
 MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
-ReplicationPolicy replicationPolicy, OffsetSyncStore 
offsetSyncStore,
-Map> 
idleConsumerGroupsOffset,
-Map> 
checkpointsPerConsumerGroup) {
+ ReplicationPolicy replicationPolicy, OffsetSyncStore 
offsetSyncStore, Set consumerGroups,
+ Map> 
idleConsumerGroupsOffset,
+ CheckpointsStore checkpointsStore) {

Review Comment:
   nit: indenting



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,10 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+log.info("OffsetSyncStore initializationMustReadToEnd:{}{}", 
initializationMustReadToEnd,
+initializationMustReadToEnd ? " - fewer checkpoints may be 
emitted" : "");

Review Comment:
   nit: Make this more verbose and user-oriented. They don't care that the 
variable is called initializationMustReadToEnd, and "Must read to end" is a 
very technical description of what is happening here.
   Specify more precisely which checkpoints aren't being emitted. Fewer could 
mean every other one, but it's actually offsets which were mirrored before the 
task started.
   
   Actually, this message works better if you put it after the 
backingStore.start() call: You can print out the oldest offset sync to say that 
translation is starting there, and whether this is limited by the 
initialization setting.
   
   



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore {
 super();
 }
 
-@Override
-public void start() {

Review Comment:
   bump on this comment, now that we're converging on the final design.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -172,7 +178,7 @@ private List sourceRecordsForGroup(String 
group) throws Interrupte
 long timestamp = System.currentTimeMillis();
 Map upstreamGroupOffsets = 
listConsumerGroupOffsets(group);
 Map newCheckpoints = 
checkpointsForGroup(upstreamGroupOffsets, group);
-Map oldCheckpoints = 
checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
+Map oldCheckpoints = 
checkpointsStore.contents().computeIfAbsent(group, ignored -> new HashMap<>());
 oldCheckpoints.putAll(newCheckpoints);

Review Comment:
   Move this to a new `CheckpointsStore#emitCheckpoints(Map)` method



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -195,7 +201,7 @@ Map 
checkpointsForGroup(Map checkpoints = 
checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
+Map checkpoints = 
checkpointsStore.contents().get(checkpoint.consumerGroupId());
 if (checkpoints == null) {
 log.trace("Emitting {} (first for this group)", checkpoint);
 return true;

Review Comment:
   This can be moved to a new `CheckpointStore#get(String, TopicPartition)` 
method.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ *

[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847064#comment-17847064
 ] 

Matthias J. Sax commented on KAFKA-16774:
-

Ah. Nice. Glad it's not a bug. Thanks for the PR Bruno. Approved.

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities [kafka]

2024-05-16 Thread via GitHub


ableegoldman commented on PR #15956:
URL: https://github.com/apache/kafka/pull/15956#issuecomment-2115949518

   Test failures are unrelated, merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities [kafka]

2024-05-16 Thread via GitHub


ableegoldman merged PR #15956:
URL: https://github.com/apache/kafka/pull/15956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603875839


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -469,4 +522,28 @@ object StorageTool extends Logging {
 }
 0
   }
+
+  private def parseNameAndLevel(input: String): Array[String] = {
+val equalsIndex = input.indexOf("=")
+if (equalsIndex < 0)
+  throw new RuntimeException("Can't parse feature=level string " + input + 
": equals sign not found.")
+val name = input.substring(0, equalsIndex).trim
+val levelString = input.substring(equalsIndex + 1).trim
+try {
+  levelString.toShort
+} catch {
+  case _: Throwable =>
+throw new RuntimeException("Can't parse feature=level string " + input 
+ ": " + "unable to parse " + levelString + " as a short.")
+}
+Array[String](name, levelString)

Review Comment:
   I pulled this method from FeatureCommand (originally I wanted them to use a 
shared method from a helper but that was a headache so I duplicated the 
method.) 
   
   I can revise this one though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847063#comment-17847063
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

The example 
([https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java])
 tries to use `MockProcessorContext` to test `FixedKeyProcessor` – this does 
not work.

Adding `MockFixKeyProcessorContext` should allow to test` FixKeyProcessor` 
using this newly added class.

What other issue does this ticket include that's not covered? Can you elaborate?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2115938603

   High level comment, just to clarify and make sure it's something we are 
considering and will cover with the follow-up PRs for timeouts. Here we're 
introducing a component to ensures that app events are expired only after 
having one chance, but that's only at the app thread level, and not for all 
events, but only for unsubscribe, and poll. Thing is that events can also be 
expired indirectly when a request is expired (so playing against this changes). 
So even if the `processBackgroundEvents` introduced here gives an expired event 
a change to run one, that won't actually happen (because the underlying request 
expires). I expect that side needed to make this whole intention work in 
practice will be a follow-up PR, am I right?
   
   Also almost none of our integration test cover the poll(ZERO) case (helper 
funcs 
[here](https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L892-L910),
 used by most of the test, poll with timeout > 0). That's probably why we did 
not find the expiration issues we have with poll(0) before. I guess that after 
addressing the timeout/expiration (this PR and follow-up), we should be able to 
add some. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-16 Thread via GitHub


philipnee commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2115931518

   hey @appchemist - let us know if this is ready for review. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603862201


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -84,98 +67,28 @@ class CoordinatorPartitionWriterTest {
   }
 
   @Test
-  def testWriteRecords(): Unit = {

Review Comment:
   Do we have an equivalent test for the writing of the records in 
CoordinatorRuntimeTest? I didn't really notice new tests, but saw we have some 
of the builder logic there. Is it tested by checking equality between the 
records generated by the helper methods and the output from running the 
CoordinatorRuntime code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15723: KRaft support in ListOffsetsRequestTest [kafka]

2024-05-16 Thread via GitHub


mimaison opened a new pull request, #15980:
URL: https://github.com/apache/kafka/pull/15980

   Based on https://github.com/apache/kafka/pull/15047 as Zihao Lin is not able 
to complete the work.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603853446


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -18,162 +18,29 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * An in-memory partition writer.
- *
- * @param  The record type.
  */
-public class InMemoryPartitionWriter implements PartitionWriter {
-
-public static class LogEntry {

Review Comment:
   nice that we could just use the real memory records



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603806833


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -18,40 +18,21 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple interface to write records to Partitions/Logs. It contains the 
minimum
  * required for coordinators.
- *
- * @param  The record type.
  */
-public interface PartitionWriter {
-
-/**
- * Serializer to translate T to bytes.
- *
- * @param  The record type.
- */
-interface Serializer {
-/**
- * Serializes the key of the record.
- */
-byte[] serializeKey(T record);
-
-/**
- * Serializes the value of the record.
- */
-byte[] serializeValue(T record);
-}
+public interface PartitionWriter {
 
 /**
  * Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, List)}}.
+ * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.

Review Comment:
   Is there a programatic way to check if these links are broken due to 
refactoring, or do you need to do it manually?
   
   Just wondering if there is an easy way to check you did them all :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16105: Reassignment fix [kafka]

2024-05-16 Thread via GitHub


gharris1727 commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1603758912


##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -248,7 +248,10 @@ public synchronized ConsumerRecords poll(final 
Duration timeout) {
 subscriptions.position(entry.getKey(), newPosition);
 }
 }
-toClear.add(entry.getKey());
+// Since reassignment logic for tiered topics relies on 
seekToBeginning,

Review Comment:
   I don't think we can change this logic, because it would re-deliver the same 
records over and over, which is a distinct from the real KafkaConsumer that 
(without seeking) tries to only deliver records once.
   
   I do think that maybe seekToBeginning should clear the records from the 
reset partitions, to mirror how the real kafka consumer uses 
FetchState.AWAIT_RESET#hasValidPosition to prevent fetching data from a 
partition which has just been reset, even if there is data buffered.
   
   Then the test can explicitly re-add the records to re-consume them.



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws 
InterruptedException {
 TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
 assertEquals(2, handler.metadataCounter);
 
-// should only read the tpId1 records
+// Adding assignment for partition 1 after related metadata records 
have already been read
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
-addRecord(consumer, metadataPartition, tpId1, 2);

Review Comment:
   this add record is gone now, and that changes the final waitForCondition in 
this test. Could both of these changes be reverted, or are they integral to the 
change to the test?



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -238,10 +238,15 @@ void maybeWaitForPartitionAssignments() throws 
InterruptedException {
 this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
 // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
 final Set seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
-.stream()
-.filter(utp -> !utp.isAssigned)
-.map(utp -> toRemoteLogPartition(utp.metadataPartition))
-.collect(Collectors.toSet());
+.stream()
+.filter(utp -> !utp.isAssigned)
+.map(utp -> utp.metadataPartition)
+// When reset to beginning is happening, we also need to 
reset the last read offset
+// Otherwise if the next reassignment request for the same 
metadata partition comes in
+// before the record of already assigned topic has been 
read, then the reset will happen again to the last read offset
+.peek(readOffsetsByMetadataPartition::remove)

Review Comment:
   Connect's WorkerSinkTask follows a very similar pattern, where consumer.seek 
is followed by some bookkeeping. We set the internal state to the offset we 
seek rather than removing it, but i think removing the offset would probably be 
safer. :+1:



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws 
InterruptedException {
 TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
 assertEquals(2, handler.metadataCounter);
 
-// should only read the tpId1 records
+// Adding assignment for partition 1 after related metadata records 
have already been read
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
-addRecord(consumer, metadataPartition, tpId1, 2);
+TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId0 + 
" to be assigned");
+
+// Adding assignment for partition0
+// to trigger the reset to last read offset and assignment for another 
partition
+// that has different metadata partition to trigger the update of 
metadata snapshot
+HashSet pa

Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603801460


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));
+
 try {
-// Apply the records to the state machine.
-if (result.replayRecords()) {
-// We compute the offset of the record based 
on the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
-for (int i = 0; i < result.records().size(); 
i++) {
+MemoryRecordsBuilder builder = 
MemoryRecords.builder(

Review Comment:
   nit: is there a benefit from putting this here and not right before the 
append method? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603797394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));

Review Comment:
   Nice we got rid of the thread local. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603782316


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1046,12 +1118,18 @@ public void run() {
 result
 );
 
-long offset = 
partitionWriter.appendEndTransactionMarker(
+long offset = partitionWriter.append(
 tp,
-producerId,
-producerEpoch,
-coordinatorEpoch,
-result
+VerificationGuard.SENTINEL,
+MemoryRecords.withEndTransactionMarker(
+time.milliseconds(),

Review Comment:
   It seems we didn't specify this time value before. Was that a bug? I guess 
it also just gets the system time in the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add demo template for transactional client [kafka]

2024-05-16 Thread via GitHub


k-raina commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1603779400


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {
+try {
+try {
+// Poll records from Kafka for a timeout of 60 seconds
+ConsumerRecords records = 
consumer.poll(ofSeconds(60));
+
+// Process records to generate word count map
+Map wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))
+.stream()
+.flatMap(record -> 
Stream.of(record.value().split(" ")))
+.map(word -> new AbstractMap.SimpleEntry<>(word, 
1))
+.collect(Collectors.toMap(
+AbstractMap.SimpleEntry::getKey,
+AbstractMap.SimpleEntry::getValue,
+(v1, v2) -> v1 + v2
+));
+
+// Begin transaction
+producer.beginTransaction();
+
+// Produce word count results to output topic
+wordCountMap.forEach((key, value) ->
+producer.send(new ProducerRecord<>(OUTPUT_TOPIC, 
key, value.toString(;
+
+// Determine offsets to commit
+Map offsetsToCommit = 
new HashMap<>();
+for (TopicPartition partition : records.partitions()) {
+List> 
partitionedRecords = records.records(partition);
+long offset = 
partitionedRecords.get(partitionedRecords.size() - 1).offset();
+offsetsToCommit.put(partition, new 
OffsetAndMetadata(offset + 1));
+}
+
+// Send offsets to transaction for atomic commit
+producer.sendOffsetsToTransaction(offsetsToCommit, 
CONSUMER_GROUP_ID);
+
+// Commit transaction
+producer.commitTransaction();
+} catch (AbortableTransactionException e) {
+// Abortable Exception: Handle Kafka exception by aborting 
transaction. Abortable Exception should never be thrown.
+producer.abortTransaction();
+resetToLastCommittedPositions(consumer);
+}
+} catch (InvalidConfiguationTransactionException e) {
+//  Fatal Error: The error is bubbled up to the application 
layer. The application can decide what to do
+closeAll();
+throw InvalidConfiguationTransactionException;

Review Comment:
   Updated in commit: 920437bd3866c456b4ace2bab79a8ca3664b7d4d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16785) Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16785:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra
> ---
>
> Key: KAFKA-16785
> URL: https://issues.apache.org/jira/browse/KAFKA-16785
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: storage_test
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add demo template for transactional client [kafka]

2024-05-16 Thread via GitHub


k-raina commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1603779131


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {
+try {
+try {
+// Poll records from Kafka for a timeout of 60 seconds
+ConsumerRecords records = 
consumer.poll(ofSeconds(60));
+
+// Process records to generate word count map
+Map wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))

Review Comment:
   Updated in commit: 920437bd3866c456b4ace2bab79a8ca3664b7d4d



##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {
+try {
+try {
+// Poll records from Kafka for a timeout of 60 seconds
+ConsumerRecords records = 
consumer.poll(ofSeconds(60));
+
+// Process records to generate word count map
+Map wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))
+.stream()
+.flatMap(record -> 
Stream.of(record.value().split(" ")))
+.map(word -> new AbstractMap.SimpleEntry<>(word, 
1))
+.collect(Collectors.toMap(
+AbstractMap.SimpleEntry::getKey,
+AbstractMap.SimpleEntry::getValue,
+(v1, v2) -> v1 + v2
+));
+
+// Begin transaction
+producer.beginTransaction();
+
+// Produce word count results to output topic
+wordCountMap.forEach((key, value) ->
+

Re: [PR] KAFKA-16264: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration [kafka]

2024-05-16 Thread via GitHub


jeqo commented on PR #15888:
URL: https://github.com/apache/kafka/pull/15888#issuecomment-2115824284

   Moving to draft til the KIP is discussed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16264: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration [kafka]

2024-05-16 Thread via GitHub


jeqo commented on PR #15888:
URL: https://github.com/apache/kafka/pull/15888#issuecomment-2115820329

   Thanks @jolshan  -- I created 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1046%3A+Expose+producer.id.expiration.check.interval.ms+as+dynamic+broker+configuration
 and started the discussion thread. 
   While writing the KIP, I included the fact that we should also expose this 
config as a non-internal one. Apart from that all should be covered on the PR 
as is


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy

2024-05-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16786:
--
Component/s: clients

> New consumer subscribe should not require the deprecated 
> partition.assignment.strategy
> --
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603740393


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   just a suggestion but I would leave it to you as they may be impl details 
I'm missing ;)



-- 
This is an automated message from the Apache Gi

[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-16 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847035#comment-17847035
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-16414:
--

Just checking if there's an agreement here and if there's any ongoing work 
towards aligning there retention configs

cc [~ckamal] [~brandboat] 

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2115715035

   Hi @gharris1727 we worked out the asynchronous loading using a wrapper to 
the checkpointsPerGroupMap.
   however when testing with different level of authorizations to see the 
fallback behaviour, 
   the simplest approach was to have the callback rethrow. It's all 
encapsulated so it's not spoiling the task IMHO !
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16448:
---

Assignee: Loïc Greffier

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Assignee: Loïc Greffier
>Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603682545


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   the benefit I see is self-cleaning which reduces the scope/responsibilities 
of `tracked`. We don't really care about events that are created an

[jira] [Commented] (KAFKA-16785) Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra

2024-05-16 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847019#comment-17847019
 ] 

Kuan Po Tseng commented on KAFKA-16785:
---

May I take over this Jira ?

> Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra
> ---
>
> Key: KAFKA-16785
> URL: https://issues.apache.org/jira/browse/KAFKA-16785
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: storage_test
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-16 Thread via GitHub


jsancio commented on PR #15859:
URL: https://github.com/apache/kafka/pull/15859#issuecomment-2115613951

   > @jsancio : Thanks for analyzing the failed tests. It would be useful to 
file jiras for untracked ones. The PR LGTM
   
   Looks like an open issue already exists: 
[KAFKA-16045](https://issues.apache.org/jira/browse/KAFKA-16045)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15723: KRaft support in ListOffsetsRequestTest [kafka]

2024-05-16 Thread via GitHub


mimaison commented on PR #15047:
URL: https://github.com/apache/kafka/pull/15047#issuecomment-2115609505

   Ok, no worries. So I'll close this PR and I'll open a new one marking you as 
co-author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15723: KRaft support in ListOffsetsRequestTest [kafka]

2024-05-16 Thread via GitHub


mimaison closed pull request #15047: KAFKA-15723: KRaft support in 
ListOffsetsRequestTest
URL: https://github.com/apache/kafka/pull/15047


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2115608996

   Thanks for the updates @Phuc-Hong-Tran! Left some comments mainly to improve 
testing, but looking good overall.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
   verify(subscriptions).matchesSubscribedPattern(topicName);
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);
+}

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
 

Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
   verify(subscriptions).matchesSubscribedPattern(topicName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603624271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));

Review Comment:
   What about `Collections.singletonMap(tp, new OffsetAndMetadata(1));` ? seems 
simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15723) KRaft support in ListOffsetsRequestTest

2024-05-16 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847012#comment-17847012
 ] 

Mickael Maison commented on KAFKA-15723:


Grabbing this ticket as Zihao Lin said in the PR that he does not have time to 
complete this work.

> KRaft support in ListOffsetsRequestTest
> ---
>
> Key: KAFKA-15723
> URL: https://issues.apache.org/jira/browse/KAFKA-15723
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Mickael Maison
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ListOffsetsRequestTest in 
> core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala need to be 
> updated to support KRaft
> 37 : def testListOffsetsErrorCodes(): Unit = {
> 84 : def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
> 112 : def testCurrentEpochValidation(): Unit = {
> 173 : def testResponseIncludesLeaderEpoch(): Unit = {
> 210 : def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
> Scanned 261 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15723) KRaft support in ListOffsetsRequestTest

2024-05-16 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-15723:
--

Assignee: Mickael Maison  (was: Zihao Lin)

> KRaft support in ListOffsetsRequestTest
> ---
>
> Key: KAFKA-15723
> URL: https://issues.apache.org/jira/browse/KAFKA-15723
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Mickael Maison
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ListOffsetsRequestTest in 
> core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala need to be 
> updated to support KRaft
> 37 : def testListOffsetsErrorCodes(): Unit = {
> 84 : def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
> 112 : def testCurrentEpochValidation(): Unit = {
> 173 : def testResponseIncludesLeaderEpoch(): Unit = {
> 210 : def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
> Scanned 261 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16350.
---
Resolution: Fixed

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847008#comment-17847008
 ] 

Bruno Cadonna commented on KAFKA-16350:
---

This issue should be gone after the following PR: 
https://github.com/apache/kafka/pull/15870

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603605365


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);

Review Comment:
   this call should be done before we poll (otherwise it's because of this here 
that the next `verify(subscriptions, never())` succeeds, so not really testing 
the logic)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603595149


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   You may be able to reuse `isProtocolInconsistent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603594084


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();

Review Comment:
   we could simplify this to a single 
`doReturn(Collections.singleton(topicName)).when(cluster).topics();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16544 DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE [kafka]

2024-05-16 Thread via GitHub


brandboat opened a new pull request, #15979:
URL: https://github.com/apache/kafka/pull/15979

   related to https://issues.apache.org/jira/browse/KAFKA-16544
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603586413


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah now I get it.. Let me add the check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-16 Thread Nicholas Feinberg (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847001#comment-17847001
 ] 

Nicholas Feinberg commented on KAFKA-16779:
---

No problem.

> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> In a Kafka cluster with all topics set to four days of retention or longer 
> (34560ms), most brokers seem to be retaining six days of data.
> This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
> and thus are regularly rolling new log segments. We observe this unexpectedly 
> high retention both via disk usage statistics and by requesting the oldest 
> available messages from Kafka.
> Some of these brokers crashed with an 'mmap failed' error (attached). When 
> those brokers started up again, they returned to the expected four days of 
> retention.
> Manually restarting brokers also seems to cause them to return to four days 
> of retention. Demoting and promoting brokers only has this effect on a small 
> part of the data hosted on a broker.
> These hosts had ~170GiB of free memory available. We saw no signs of pressure 
> on either system or JVM heap memory before or after they reported this error. 
> Committed memory seems to be around 10%, so this doesn't seem to be an 
> overcommit issue.
> This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). 
> Prior to the upgrade, it was running on Kafka 2.4.
> We last reduced retention for ops on May 7th, after which we restored 
> retention to our default of four days. This was the second time we've 
> temporarily reduced and restored retention since the upgrade. This problem 
> did not manifest the previous time we did so, nor did it manifest on our 
> other Kafka 3.7 clusters.
> We are running on AWS 
> [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
> have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
> (i.e. unraided).
> Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
> we're still using Zookeeper.
> Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
> hosts. Please let me know if I can provide any further information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah right, they should be validated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah right, they should be validated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-16 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1603565420


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1750,6 +1761,7 @@ Map allTasks() {
 if (stateUpdater != null) {
 final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
 ret.putAll(tasks.allTasksPerId());
+
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));

Review Comment:
   Here the PR: https://github.com/apache/kafka/pull/15978



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16774: Delete flaky test since it is redundant [kafka]

2024-05-16 Thread via GitHub


cadonna commented on code in PR #15978:
URL: https://github.com/apache/kafka/pull/15978#discussion_r1603562973


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -1377,47 +1377,6 @@ public void 
shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws Inter
 assertEquals(Collections.emptySet(), 
thread.taskManager().activeTaskIds());
 }
 
-@Test
-public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws 
InterruptedException {
-internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
-
-thread = createStreamThread(
-CLIENT_ID,
-new StreamsConfig(configProps(true)),
-new MockTime(1)
-);
-
-thread.start();

Review Comment:
   This combined with the calls on the thread below are the cause of the 
flakiness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16774: Delete flaky test since it is redundant [kafka]

2024-05-16 Thread via GitHub


cadonna opened a new pull request, #15978:
URL: https://github.com/apache/kafka/pull/15978

   The test shouldCloseAllTaskProducersOnCloseIfEosEnabled() in 
StreamThreadTest is flaky with a concurrent modification exception. The 
concurrent modification exception is caused by the test itself because it 
starts a stream thread and at the same time the thread that executes the test 
calls methods on the stream thread. The stream thread was not designed for such 
a concurrency.
   The tests verifies that under EOS the streams producer are closed during 
shutdown. Actually the test is not needed since we already have a test that 
verifies that when the stream thread shuts down also the task manager shuts 
down and for the tasks manager we have tests that verify that the producers are 
closed when the task manager shuts down.
   
   This commit verifies that those tests are run under EOS and ALOS.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   What I said is different. What I meant is that requestProtocolType and 
requestProtocolName could be null here. If they are, the validation will fail, 
no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   What I said is different. What I meant is that requestProtocolType and 
requestProtocolName could be null here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603545254


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   I'm not sure if it's necessary as we've checked if the member uses the 
classic protocol before, but I guess it doesn't hurt to explicitly check the 
protocols here. Let me add the validation..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603537376


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1482,7 +1486,7 @@ private void updatePatternSubscription(Cluster cluster) {
 .collect(Collectors.toSet());
 if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
 applicationEventHandler.add(new SubscriptionChangeEvent());
-metadata.requestUpdateForNewTopics();
+this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();

Review Comment:
   Since we're completing the regex implementation, could you clean up the func 
doc? We should remove the TODO, and I would suggest we rephrase the 
description, and simply state what this evaluates the regex against the list of 
topic names from metadata, and updates the subscription state with the matching 
topics. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16710:Continuously `makeFollower` may cause the replica fetcher thread to encounter an offset mismatch exception [kafka]

2024-05-16 Thread via GitHub


hudeqi commented on PR #15929:
URL: https://github.com/apache/kafka/pull/15929#issuecomment-2115483193

   Hi @dajac , could you help to review this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603535727


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3931,6 +4071,74 @@ public CoordinatorResult 
classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains the appendFuture to return the 
response.
+ */
+private CoordinatorResult 
classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), 
request.generationId());
+throwIfClassicProtocolUnmatched(member, request.protocolType(), 
request.protocolName());
+throwIfRebalanceInProgress(group, member);
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+responseFuture.complete(new SyncGroupResponseData()
+.setProtocolType(request.protocolType())
+.setProtocolName(request.protocolName())
+.setAssignment(prepareAssignment(member)));
+}
+});
+
+return new CoordinatorResult<>(Collections.emptyList(), appendFuture, 
false);
+}
+
+/**
+ * Serializes the member's assigned partitions with ConsumerProtocol.
+ *
+ * @param member The ConsumerGroupMember.
+ * @return The serialized assigned partitions.
+ */
+private byte[] prepareAssignment(ConsumerGroupMember member) {

Review Comment:
   Now that we have this method, I wonder if we should inline the body of 
`deserializeProtocolVersion` here. I don't think that we will reuse 
deserializeProtocolVersion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy

2024-05-16 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846990#comment-17846990
 ] 

Phuc Hong Tran edited comment on KAFKA-16786 at 5/16/24 2:53 PM:
-

Sure, I’ll take it


was (Author: JIRAUSER301295):
Sure, @lianetm. I’ll take it

> New consumer subscribe should not require the deprecated 
> partition.assignment.strategy
> --
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603525427


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   I just recalled that the protocol type and name are optional in the sync 
group request so we should only validate them if they are non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >