Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16211:
URL: https://github.com/apache/kafka/pull/16211#issuecomment-2167362750

   @showuon could you please fix the build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16918: TestUtils#assertFutureThrows should use future.get with timeout [kafka]

2024-06-13 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1639358954


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +558,22 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());
+try {
+future.get(5, TimeUnit.SECONDS);
+fail("expected to throw ExecutionException...");
+} catch (TimeoutException e) {
+fail("timeout waiting");
+return null;
+} catch (ExecutionException e) {
+ExecutionException exception = 
assertThrows(ExecutionException.class, future::get);
+assertInstanceOf(exceptionCauseClass, exception.getCause(),
+"Unexpected exception cause " + exception.getCause());
+return exceptionCauseClass.cast(exception.getCause());
+} catch (InterruptedException e) {
+fail("Unexpected exception cause" + e.getCause());
+return null;
+}
+return null;

Review Comment:
   For 1, yes, for 2, maybe check this commit: 
https://github.com/apache/kafka/pull/16264#discussion_r1639351349



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1639296989


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 return true;
 
 log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+
+// The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+// this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+// (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+// case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+// Give the event a reasonable amount of time to complete.
+final long timeoutMs = Math.max(defaultApiTimeoutMs, 
timer.remainingMs());
+final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
+pendingOffsetFetchEvent = new 
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+applicationEventHandler.add(pendingOffsetFetchEvent);
+}
+
+final CompletableFuture> future 
= pendingOffsetFetchEvent.future();
+boolean shouldClearPendingEvent = false;
+
 try {
-final FetchCommittedOffsetsEvent event =
-new FetchCommittedOffsetsEvent(
-initializingPartitions,
-calculateDeadlineMs(timer));
-wakeupTrigger.setActiveTask(event.future());
-final Map offsets = 
applicationEventHandler.addAndGet(event);
+wakeupTrigger.setActiveTask(future);
+final Map offsets = 
ConsumerUtils.getResult(future, timer);
+
+// Clear the pending event once its result is successfully 
retrieved.
+shouldClearPendingEvent = true;
+
 refreshCommittedOffsets(offsets, metadata, subscriptions);
 return true;
 } catch (TimeoutException e) {
 log.error("Couldn't refresh committed offsets before timeout 
expired");
 return false;
+} catch (InterruptException e) {
+throw e;
+} catch (Throwable t) {
+// Clear the pending event on errors that are not timeout- or 
interrupt-related.
+shouldClearPendingEvent = true;
+throw ConsumerUtils.maybeWrapAsKafkaException(t);
 } finally {
+if (shouldClearPendingEvent)
+pendingOffsetFetchEvent = null;
+
 wakeupTrigger.clearTask();
 }
 }
 
+/**
+ * This determines if the {@link #pendingOffsetFetchEvent pending offset 
fetch event} can be reused. Reuse
+ * is only possible if all the following conditions are true:
+ *
+ * 
+ * A pending offset fetch event exists
+ * The partition set of the pending offset fetch event is the same 
as the given partition set
+ * The pending offset fetch event has not expired
+ * 
+ */
+private boolean canReusePendingOffsetFetchEvent(Set 
partitions) {
+if (pendingOffsetFetchEvent == null)
+return false;
+
+if (!pendingOffsetFetchEvent.partitions().equals(partitions))

Review Comment:
   Can it get reuse if the partitions of fetch request includes "all" input 
partitions? It seems `refreshCommittedOffsets` can ignore those partitions.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 return true;
 
 log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+
+// The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+// this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+// (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+// case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+// Give the event a reasonable amount of time to complete.
+final long timeoutMs = Math.max(defaultApiTimeoutMs, 
timer.remainingMs());
+final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
+pendingOffsetFetchEvent = new 
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+applicationEventHandler.add(pendingOffsetFetchEvent);
+}
+
+final CompletableFuture> futu

Re: [PR] KAFKA-16918: TestUtils#assertFutureThrows should use future.get with timeout [kafka]

2024-06-13 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1639351349


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +558,20 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());

Review Comment:
   The goal of this PR, is to maintain the same semantics (the logic) as 
before, but make the future.get has time limit.
   So, we should not change the assert logic here.
   Maybe you should understand the original logic first, and then making other 
changes.
   Some questions you need to know first is:
   1. What will happen when future.get has no exception thrown?
   2. What will happen when future.get throws ExecutionException?
   3. What's exceptionCauseClass used for?
   4. What's the return value?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16955: fix synchronization of streams threadState [kafka]

2024-06-13 Thread via GitHub


rodesai commented on code in PR #16337:
URL: https://github.com/apache/kafka/pull/16337#discussion_r1639348243


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -635,14 +634,10 @@ public void setStandbyUpdateListener(final 
StandbyUpdateListener standbyListener
 final class StreamStateListener implements StreamThread.StateListener {
 private final Map threadState;
 private GlobalStreamThread.State globalThreadState;
-// this lock should always be held before the state lock
-private final Object threadStatesLock;
 
-StreamStateListener(final Map threadState,
-final GlobalStreamThread.State globalThreadState) {
-this.threadState = threadState;
+StreamStateListener(final GlobalStreamThread.State globalThreadState) {

Review Comment:
   I think it can't be a singleton because we support multiple KafkaStreams 
instances per process. Having it be a private inner class of `KafkaStreams` 
feels right to me. I've changed its visibility to private accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]

2024-06-13 Thread via GitHub


showuon commented on PR #16319:
URL: https://github.com/apache/kafka/pull/16319#issuecomment-2167331023

   Retriggering CI build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16319/6/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-06-13 Thread via GitHub


TaiJuWu commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1639336398


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -16,834 +16,986 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.tools.ToolsTestUtils;
 
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.test.TestUtils.RANDOM;
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
-public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DescribeConsumerGroupTest {
+private static final String TOPIC_PREFIX = "test.topic.";
+private static final String GROUP_PREFIX = "test.group.";
 private static final List> DESCRIBE_TYPE_OFFSETS = 
Arrays.asList(Collections.singletonList(""), 
Collections.singletonList("--offsets"));
 private static final List> DESCRIBE_TYPE_MEMBERS = 
Arrays.asList(Collections.singletonList("--members"), 
Arrays.asList("--members", "--verbose"));
 private static final List> DESCRIBE_TYPE_STATE = 
Collections.singletonList(Collections.singletonList("--state"));
-private static final List> DESCRIBE_TYPES;
-
-static {
-List> describeTypes = new ArrayList<>();
-
-describeTypes.addAll(DESCRIBE_TYPE_OFFSETS);
-describeTypes.addAll(DESCRIBE_TYPE_MEMBERS);
-describeTypes.addAll(DESCRIBE_TYPE_STATE);
+private static final List> DESCRIBE_TYPES = 
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, 
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).collect(Collectors.toList());
+private ClusterInstance clusterInstance;
 
-DESCRIBE_TYPES = describeTypes;
+private static List generator() {
+return ConsumerGroupCommandTestUtils.generator();
 }
 
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTemplate("generator")
+public void testDescribeNonExistingGroup(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;

Review Comment:
   Is this line necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, 

Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   I wonder if we should handle the `java.lang.InterruptException` here or in 
the `ConsumerUtils`.  Currently, `ConsumerUtils` wraps the exception with 
KafkaException and doesn't do more.  I think we need to unset the interrupt 
flag using Thread.Interrupted() to allow consumer to have a clean shutdown when 
user invokes `close()`. 
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this during shutdown is for the obvious reason, the worker calls 
unsubscribe before shutting down.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   I wonder if we should handle the `java.lang.InterruptException` here or in 
the `ConsumerUtils`.  Currently, `ConsumerUtils` wraps the exception with 
KafkaException and doesn't do more.  I think we need to unset the interrupt 
flag using Thread.Interrupted() to allow consumer to have a clean shutdown when 
user invokes `close()`. 
   Also InterruptedException can be thrown everywhere by java, i guess the 
reason we are seeing this during shutdown is for the obvious reason, the worker 
calls unsubscribe before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: migrate connect module to junit 5 (Part 2) [kafka]

2024-06-13 Thread via GitHub


m1a2st commented on PR #16330:
URL: https://github.com/apache/kafka/pull/16330#issuecomment-2167253065

   @chia7712, Thanks for your comment, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: migrate connect module to junit 5 (Part 2) [kafka]

2024-06-13 Thread via GitHub


m1a2st commented on code in PR #16330:
URL: https://github.com/apache/kafka/pull/16330#discussion_r1639283310


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java:
##
@@ -16,28 +16,26 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.junit.rules.TestRule;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A utility class for Connect's integration tests
  */
-public class ConnectIntegrationTestUtils {
-public static TestRule newTestWatcher(Logger log) {
-return new TestWatcher() {
-@Override
-protected void starting(Description description) {
-super.starting(description);
-log.info("Starting test {}", description.getMethodName());
-}
+public class ConnectIntegrationTestUtils implements TestWatcher, 
BeforeEachCallback, AfterEachCallback {

Review Comment:
   It's a good idea, I will remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16939: Revisit ConfigCommandIntegrationTest [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16317:
URL: https://github.com/apache/kafka/pull/16317#discussion_r1639281603


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -515,14 +524,19 @@ private void deleteAndVerifyConfig(Admin client, 
Optional brokerId, Set<
 }
 
 private void verifyConfigDefaultValue(Admin client, Optional 
brokerId, Set config) throws Exception {
-ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
+TestUtils.waitForCondition(() -> {
+Map current = getConfigEntryStream(client, 
configResource)
+.filter(configEntry -> 
Objects.nonNull(configEntry.value()))
+.collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
+return config.stream().allMatch(current::containsKey);
+}, 5000, config + " are not updated");
+}
+
+private void verifyConfigSecretValue(Admin client, Optional 
brokerId, Set config) throws Exception {
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
 TestUtils.waitForCondition(() -> {
-Map current = 
client.describeConfigs(singletonList(configResource))
-.all()
-.get()
-.values()
-.stream()
-.flatMap(e -> e.entries().stream())
+Map current = getConfigEntryStream(client, 
configResource)

Review Comment:
   we should use `ConfigEntry::isSensitive` to get expected keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-13 Thread via GitHub


aliehsaeedii commented on code in PR #16332:
URL: https://github.com/apache/kafka/pull/16332#discussion_r1639280799


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -792,6 +792,9 @@ public void sendOffsetsToTransaction(Map offs
  * 
  * Further, if any of the {@link #send(ProducerRecord)} calls which were 
part of the transaction hit irrecoverable
  * errors, this method will throw the last received exception immediately 
and the transaction will not be committed.
+ * It should be noted that if flush() is called explicitly 
beforehand, this method will NOT throw any
+ * exception related to the {@link #send(ProducerRecord)} calls. Since 
flush() clears the last received
+ * exception and returns the transaction from the error state.

Review Comment:
   >Should it be "and transits the transaction out of any error state"
   I agree with you.



##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -792,6 +792,9 @@ public void sendOffsetsToTransaction(Map offs
  * 
  * Further, if any of the {@link #send(ProducerRecord)} calls which were 
part of the transaction hit irrecoverable
  * errors, this method will throw the last received exception immediately 
and the transaction will not be committed.
+ * It should be noted that if flush() is called explicitly 
beforehand, this method will NOT throw any
+ * exception related to the {@link #send(ProducerRecord)} calls. Since 
flush() clears the last received
+ * exception and returns the transaction from the error state.

Review Comment:
   >Should it be "and transits the transaction out of any error state"
   
   I agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2167234187

   @gharris1727 Hey, I have made all the changes. Please feel free to review 
the code again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: migrate connect module to junit 5 (Part 2) [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16330:
URL: https://github.com/apache/kafka/pull/16330#discussion_r1639268606


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java:
##
@@ -16,28 +16,26 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.junit.rules.TestRule;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A utility class for Connect's integration tests
  */
-public class ConnectIntegrationTestUtils {
-public static TestRule newTestWatcher(Logger log) {
-return new TestWatcher() {
-@Override
-protected void starting(Description description) {
-super.starting(description);
-log.info("Starting test {}", description.getMethodName());
-}
+public class ConnectIntegrationTestUtils implements TestWatcher, 
BeforeEachCallback, AfterEachCallback {

Review Comment:
   It seems this is used to log test name. Maybe we can move the log to each 
tests? There are only three class using this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: migrate connect module to junit 5 [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16328:
URL: https://github.com/apache/kafka/pull/16328#discussion_r1639258794


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java:
##
@@ -46,14 +48,10 @@ public class SharedTopicAdminTest {
 @Mock private Function, TopicAdmin> factory;
 private SharedTopicAdmin sharedAdmin;

Review Comment:
   we can move this to local variable, right?



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##
@@ -123,12 +127,23 @@ public void testSaveRestore() throws Exception {
 
 @Test
 public void testThreadName() {
+converter = mock(Converter.class);
+store = new FileOffsetBackingStore(converter);
+tempFile = assertDoesNotThrow(() -> 
File.createTempFile("fileoffsetbackingstore", null));
+Map props = new HashMap<>();
+props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
tempFile.getAbsolutePath());
+props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+config = new StandaloneConfig(props);
+store.configure(config);
+store.start();
 assertTrue(((ThreadPoolExecutor) store.executor).getThreadFactory()

Review Comment:
   Maybe we can move this `assert` to `setup` and then remove this test case. 
That is more simple to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-13 Thread via GitHub


frankvicky commented on PR #16250:
URL: https://github.com/apache/kafka/pull/16250#issuecomment-2167221151

   Hi @chia7712 , I have fixed the build error, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


omkreddy merged PR #16274:
URL: https://github.com/apache/kafka/pull/16274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (part 1) module to JUnit 5 [kafka]

2024-06-13 Thread via GitHub


chia7712 merged PR #16254:
URL: https://github.com/apache/kafka/pull/16254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16250:
URL: https://github.com/apache/kafka/pull/16250#issuecomment-2167204001

   @frankvicky please fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `metadata` and `server` and `storage` module [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16297:
URL: https://github.com/apache/kafka/pull/16297#issuecomment-2167203133

   @gongxuanzhang please fix the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `group-coordinator` and `group-coordinator-api` [kafka]

2024-06-13 Thread via GitHub


chia7712 merged PR #16298:
URL: https://github.com/apache/kafka/pull/16298


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add documentation for the native docker image [kafka]

2024-06-13 Thread via GitHub


kagarwal06 opened a new pull request, #16338:
URL: https://github.com/apache/kafka/pull/16338

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16210:
URL: https://github.com/apache/kafka/pull/16210#issuecomment-2167190716

   @dopuskh3 could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
that the user can have a chance to cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this during shutdown is for the obvious reason, the worker calls 
unsubscribe before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
when can cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this during shutdown is for the obvious reason, the worker calls 
unsubscribe before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
when can cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this is for the obvious reason, the worker calls unsubscribe 
before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   i wonder if we should wrap the exceptions (e.g. 
`java.lang.InterruptedException`) in KafkaException.
   
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
when can cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this is for the obvious reason, the worker calls unsubscribe 
before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
when can cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this is for the obvious reason, the worker calls unsubscribe 
before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]

2024-06-13 Thread via GitHub


satishd commented on PR #16321:
URL: https://github.com/apache/kafka/pull/16321#issuecomment-2167183730

   @jlprat This is a bug fix that addresses having a wrong metric in edge case 
scenarios. It is good to have it merged in 3.8 too.  Let me know if you are 
fine with adding it as part of 3.8 branch. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


philipnee commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1639205859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1512,7 +1513,9 @@ public void unsubscribe() {
 }
 resetGroupMetadata();
 }
-subscriptions.unsubscribe();
+} catch (Exception e) {
+log.error("Unsubscribe failed", e);
+throw e;

Review Comment:
   is this right? unsubscribe only throws two exceptions, i.e. the illegal 
state and illegal argument exception.
   
   for InterruptedException, I wonder if we should call Thread.Interrupted() so 
when can cleanly shutdown the consumer. wdyt?
   
   Also InterruptedException can be thrown everywhere now, i guess the reason 
we are seeing this is for the obvious reason, the worker calls unsubscribe 
before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]

2024-06-13 Thread via GitHub


satishd merged PR #16321:
URL: https://github.com/apache/kafka/pull/16321


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1639198546


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java:
##
@@ -51,4 +60,16 @@ public final class TransactionLogConfigs {
 public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG 
= "producer.id.expiration.check.interval.ms";
 public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 
60;
 public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = 
"The interval at which to remove producer IDs that have expired due to 
producer.id.expiration.ms passing.";
+public static final ConfigDef CONFIG_DEF =  new ConfigDef()
+.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
+
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
+
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, 
atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
+
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
+
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
+
+
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, 
BOOLEAN, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
+
+.define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
+// Configuration for testing only as default value should be 
sufficient for typical usage
+
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, 
atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);

Review Comment:
   Please add `define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, 
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)` to fix the failed tests :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]

2024-06-13 Thread via GitHub


showuon commented on PR #16211:
URL: https://github.com/apache/kafka/pull/16211#issuecomment-2167155537

   > @showuon can you update the description?
   
   PR description updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix wrong highWatermark assert equals condition in HighwatermarkPersistenceTest [kafka]

2024-06-13 Thread via GitHub


github-actions[bot] commented on PR #15544:
URL: https://github.com/apache/kafka/pull/15544#issuecomment-2167147382

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refine javadoc in TopicsDelta TopicDelta LocalReplicaChanges [kafka]

2024-06-13 Thread via GitHub


showuon merged PR #16195:
URL: https://github.com/apache/kafka/pull/16195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-13 Thread via GitHub


kamalcph commented on PR #16210:
URL: https://github.com/apache/kafka/pull/16210#issuecomment-2167137194

   Test failures are unrelated. The failed test is an existing flaky one:
   
   ```
   testCorrectnessForCacheAndIndexFilesWhenResizeCache() – 
kafka.log.remote.RemoteIndexCacheTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]

2024-06-13 Thread via GitHub


kamalcph commented on PR #16321:
URL: https://github.com/apache/kafka/pull/16321#issuecomment-2167136261

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16116:
URL: https://github.com/apache/kafka/pull/16116#issuecomment-2167090246

   wait a minute. it seems there are failed tests related to this PR
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16116/19/tests
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]

2024-06-13 Thread via GitHub


mjsax commented on PR #15513:
URL: https://github.com/apache/kafka/pull/15513#issuecomment-2167050910

   Thanks @gharris1727 -- merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]

2024-06-13 Thread via GitHub


mjsax merged PR #15513:
URL: https://github.com/apache/kafka/pull/15513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add interface for aliveBroker and isShutDwon for Brokers. [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16323:
URL: https://github.com/apache/kafka/pull/16323#discussion_r1639102144


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -232,4 +232,17 @@ public void 
testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
 
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
 Assertions.assertEquals(1, 
clusterInstance.supportedGroupProtocols().size());
 }
+
+@ClusterTest(brokers = 4)
+public void testClusterAliveBrokers(ClusterInstance clusterInstance) 
throws Exception {
+clusterInstance.waitForReadyBrokers();
+clusterInstance.shutdownBroker(0);
+List aliveBrokerAfterShutdown = Arrays.asList(1, 2, 3);
+
+Assertions.assertEquals(3, clusterInstance.aliveBrokers().size());
+
+clusterInstance.aliveBrokers().forEach(s -> Assertions.assertTrue(

Review Comment:
   It would be better to make sure they are definitely identical.
   ```
   Assertions.assertEquals(aliveBrokerAfterShutdown,
   clusterInstance.aliveBrokers().stream().map(s -> 
s.config().brokerId()).collect(Collectors.toList()));
   ```
   



##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -51,6 +52,10 @@ default boolean isKRaftTest() {
 
 Map brokers();
 
+default List aliveBrokers() {

Review Comment:
   Could it return `Map` which is same as `brokers`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16936 : Upgrade slf4j, sys property for provider [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on code in PR #16324:
URL: https://github.com/apache/kafka/pull/16324#discussion_r1639091686


##
gradle/dependencies.gradle:
##
@@ -237,14 +237,14 @@ libs += [
   pcollections: "org.pcollections:pcollections:$versions.pcollections",
   opentelemetryProto: 
"io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
   reflections: "org.reflections:reflections:$versions.reflections",
+  slf4jReload4j: "org.slf4j:slf4j-reload4j:$versions.slf4j",

Review Comment:
   Could you please move it down to around `slf4jApi`?



##
bin/kafka-run-class.sh:
##
@@ -243,6 +243,14 @@ fi
 (( WINDOWS_OS_FORMAT )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
 KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
 
+# slf4j provider settings
+if [ -z "$SLF4J_PROVIDER" ]; then
+  SLF4J_PROVIDER="org.slf4j.reload4j.Reload4jServiceProvider"
+fi
+
+# Add the slf4j provider to KAFKA_OPTS
+KAFKA_OPTS="$KAFKA_OPTS -Dslf4j.provider=$SLF4J_PROVIDER"

Review Comment:
   Could we merge this new property into either `KAFKA_LOG4J_OPTS` or 
`KAFKA_LOG4J_CMD_OPTS`? Also, please add it to `kafka-run-class.bat` too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16955: fix synchronization of streams threadState [kafka]

2024-06-13 Thread via GitHub


mjsax commented on code in PR #16337:
URL: https://github.com/apache/kafka/pull/16337#discussion_r1639096220


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -635,14 +634,10 @@ public void setStandbyUpdateListener(final 
StandbyUpdateListener standbyListener
 final class StreamStateListener implements StreamThread.StateListener {
 private final Map threadState;
 private GlobalStreamThread.State globalThreadState;
-// this lock should always be held before the state lock
-private final Object threadStatesLock;
 
-StreamStateListener(final Map threadState,
-final GlobalStreamThread.State globalThreadState) {
-this.threadState = threadState;
+StreamStateListener(final GlobalStreamThread.State globalThreadState) {

Review Comment:
   Orthogonal question: should we also make this class a singleton? It's use 
correctly with a single instance right now but might be good to enforce it 
additionally to guard against future mistakes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16933: New consumer unsubscribe close commit fixes [kafka]

2024-06-13 Thread via GitHub


lianetm commented on PR #16272:
URL: https://github.com/apache/kafka/pull/16272#issuecomment-2167024740

   Build completed with 5 unrelates failures:
   
   > Build / JDK 21 and Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   > Build / JDK 21 and Scala 2.13 / shouldQuerySpecificStalePartitionStores() 
– org.apache.kafka.streams.integration.StoreQueryIntegrationTestBuild / JDK 8 
and Scala 2.12 / shouldRestoreState() – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   > Build / JDK 8 and Scala 2.12 / shouldRestoreState() – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   > Build / JDK 8 and Scala 2.12 / testDescribeQuorumStatusSuccessful [6] 
Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16879) SystemTime should use singleton mode

2024-06-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16879.

Fix Version/s: 3.9.0
   Resolution: Fixed

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
> Fix For: 3.9.0
>
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-13 Thread via GitHub


chia7712 merged PR #16266:
URL: https://github.com/apache/kafka/pull/16266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on PR #16310:
URL: https://github.com/apache/kafka/pull/16310#issuecomment-2167013423

   @jlprat—This Jira/PR is a blocker for the KIP-848 Java client work. It 
sounds like we're really close to merging this within the next day or two. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639075718


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
 this.pollTimeoutMs = pollTimeoutMs;
 this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
 this.time = Objects.requireNonNull(time);
+this.isInternalConsumerClosed = new AtomicBoolean(false);
 this.uninitializedAt = time.milliseconds();
 }
 
 @Override
 public void run() {
 log.info("Starting consumer task thread.");
 while (!isClosed) {
-try {
-if (hasAssignmentChanged) {
-maybeWaitForPartitionAssignments();
-}
+ingestRecords();
+}
+closeConsumer();
+log.info("Exited from consumer task thread");
+}
 
-log.trace("Polling consumer to receive remote log metadata 
topic records");
-final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-for (ConsumerRecord record : consumerRecords) {
-processConsumerRecord(record);
-}
-maybeMarkUserPartitionsAsReady();
-} catch (final WakeupException ex) {
-// ignore logging the error
-isClosed = true;
-} catch (final RetriableException ex) {
-log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-} catch (final Exception ex) {
-isClosed = true;
-log.error("Error occurred while processing the records", ex);
+// public for testing
+public void ingestRecords() {
+try {
+if (hasAssignmentChanged) {
+maybeWaitForPartitionAssignments();
 }
+
+log.trace("Polling consumer to receive remote log metadata topic 
records");
+final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+for (ConsumerRecord record : consumerRecords) {
+processConsumerRecord(record);
+}
+maybeMarkUserPartitionsAsReady();
+} catch (final WakeupException ex) {
+// ignore logging the error
+isClosed = true;
+closeConsumer();
+} catch (final RetriableException ex) {
+log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+} catch (final Exception ex) {
+isClosed = true;
+log.error("Error occurred while processing the records", ex);
+closeConsumer();
 }
-try {
-consumer.close(Duration.ofSeconds(30));
-} catch (final Exception e) {
-log.error("Error encountered while closing the consumer", e);
+}
+
+private void closeConsumer() {
+if (isInternalConsumerClosed.compareAndSet(false, true)) {

Review Comment:
   The ConsumerTask is an important component, and making fewer changes is 
safer. If ingestRecords does not have the ability to close the consumer, it may 
seem a bit incomplete, but after all, it's an internal method. 
   I think keeping it simple is indeed better👍.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


jolshan commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2166998275

   Hey Omnia. Thanks for your work (and patience) so far. I will make another 
deep pass tomorrow or early next week. 
   Things are looking good though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639071233


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
 assertTrue(handler.isPartitionLoaded.get(idPartition));
 }
 }
 
 @Test
-public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+public void testRemoveAssignmentsForPartitions() {
 final List allPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = allPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-thread.start();
+consumerTask.ingestRecords();
 
 final TopicIdPartition tpId = allPartitions.get(0);
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
 addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-"Couldn't read record");
+consumerTask.ingestRecords();
+assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),

Review Comment:
   I see😉, there was a bit of misunderstanding before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1639071024


##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -34,10 +34,21 @@ public class ApiVersions {
 
 private final Map nodeApiVersions = new 
HashMap<>();
 private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
 
 public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
 this.nodeApiVersions.put(nodeId, nodeApiVersions);
 this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
+}
+
+private short computeMaxProduceSupportedVersion() {
+Optional knownBrokerNodesMinSupportedVersionForProduce = 
this.nodeApiVersions.values().stream()

Review Comment:
   sounds good. this is consistent with the fetching logic 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1639052751


##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -34,10 +34,21 @@ public class ApiVersions {
 
 private final Map nodeApiVersions = new 
HashMap<>();
 private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
 
 public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
 this.nodeApiVersions.put(nodeId, nodeApiVersions);
 this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
+}
+
+private short computeMaxProduceSupportedVersion() {
+Optional knownBrokerNodesMinSupportedVersionForProduce = 
this.nodeApiVersions.values().stream()

Review Comment:
   We only check ids for topics included in the batch, and we map these from 
the metadata which at this point the producer already fetched them or cached 
them.
   
   By the time we build the `ProduceRequest` the `ProducerMetadata` would 
already get updated if there is a need to do so as usual (I didn't change this 
logic) . Then if I found out that `ProducerMetadata` doesn't contain the topic 
id then the request will be send out with version 11 using the topic name  only 
and broker will figure the id out when it receive it. So the client doesn't 
need to refetch topic ids every time we build a request. 
   
   
   On broker side when broker receive the request if the request is < 12 then 
we use MetadataCache on the broker to map the topic id to topic name and don't 
try to rebuild the `MetadataCache`. If we can't find topic id in MetadataCache 
for this topic then the request would fail with `UNKNOWN_TOPIC_ID`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16116:
URL: https://github.com/apache/kafka/pull/16116#issuecomment-2166989084

   > I think we can squeeze part of this vision in as it will reduce conflict 
with this work going forward specially with the ongoing work for shared groups 
and new coordinator which I want to avoid as it is hardest to fix
   
   I love this code style
   
   > please have a look again. I would really appreciate if we can merge this 
pr as it is getting painful to keep up with the conflicts :)
   
   I've set the alarm for this PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16076) RestClient Interrupting the thread in case of InterruptedException

2024-06-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16076.

Fix Version/s: 3.9.0
   Resolution: Fixed

> RestClient Interrupting the thread in case of InterruptedException
> --
>
> Key: KAFKA-16076
> URL: https://issues.apache.org/jira/browse/KAFKA-16076
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Mehrdad Karami
>Assignee: Ksolves
>Priority: Minor
>  Labels: easyfix
> Fix For: 3.9.0
>
>
> In RestClient class, httpRequest is being called with different threads. An 
> InterruptedException in case of failure is used to handle its specific 
> exceptions, nevertheless it's forgot to call 
> Thread.currentThread().interrupt().
> In general, it's a good practice to call this so the rest of code know the 
> thread was interrupted already.
> Note:
> Some methods that cause a thread to wait or sleep (like 
> {{{}Thread.sleep(){}}}) will check this flag. If they see it’s set, they’ll 
> stop waiting/sleeping and throw an {{InterruptedException.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16076: Interrupting the currentThread fix [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #15110:
URL: https://github.com/apache/kafka/pull/15110#issuecomment-2166983767

   this is fixed by #16282


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16076: Interrupting the currentThread fix [kafka]

2024-06-13 Thread via GitHub


chia7712 closed pull request #15110: KAFKA-16076: Interrupting the 
currentThread fix
URL: https://github.com/apache/kafka/pull/15110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16076: Fix Missing thread interrupt call in RestClient class [kafka]

2024-06-13 Thread via GitHub


chia7712 merged PR #16282:
URL: https://github.com/apache/kafka/pull/16282


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16076: Fix Missing thread interrupt call in RestClient class [kafka]

2024-06-13 Thread via GitHub


chia7712 commented on PR #16282:
URL: https://github.com/apache/kafka/pull/16282#issuecomment-2166982488

   all failed tests pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2166980169

   @dengziming I noticed that you had a look into this JIRA before and you 
worked with Justine on the implementations of this KIP before. Would you be 
able to provide some feedback on this PR? Thanks for your help with this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16954) Move consumer leave operations on close to background thread

2024-06-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16954:
--
Component/s: clients
 consumer

> Move consumer leave operations on close to background thread
> 
>
> Key: KAFKA-16954
> URL: https://issues.apache.org/jira/browse/KAFKA-16954
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>
> When a consumer unsubscribes, the app thread simply triggers an Unsubscribe 
> event that will take care of it all in the background thread: release 
> assignment (callbacks), clear assigned partitions, and send leave group HB.
> On the contrary, when a consumer is closed, these actions happen in both 
> threads:
>  * release assignment -> in the app thread by directly running the callbacks
>  * clear assignment -> in app thread by updating the subscriptionState
>  * send leave group HB -> in the background thread via an event LeaveOnClose 
> This situation could lead to race conditions, mainly because of the close 
> updating the subscription state in the app thread, when other operations in 
> the background could be already running based on it. Ex. 
>  * unsubscribe in app thread (triggers background UnsubscribeEvent to revoke 
> and leave)
>  * unsubscribe fails (ex. interrupted, leaving operation running in the 
> background thread to revoke partitions and leave)
>  * consumer close (will revoke and clear assignment in the app thread)
>  *  UnsubscribeEvent in the background may fail by trying to revoke 
> partitions that it does not own anymore - _No current assignment for 
> partition ..._
> A basic check has been added to the background thread revocation to avoid the 
> race condition, ensuring that we only revoke partitions we own, but still we 
> should avoid the root cause, which is updating the assignment on the app 
> thread. We should consider having the close operation as a single 
> LeaveOnClose event handled in the background. That even already takes cares 
> of revoking the partitions and clearing assignment on the background, so no 
> need to take care of it in the app thread. We should only ensure that we 
> processBackgroundEvents until the LeaveOnClose completes (to allow for 
> callbacks to run in the app thread)
>  
> Trying to understand the current approach, I imagine the initial motivation 
> to have the callabacks (and assignment cleared) in the app thread was to 
> avoid the back-and-forth: app thread close -> background thread leave event 
> -> app thread to run callback -> background thread to clear assignment and 
> send HB. But updating the assignment on the app thread ends up being 
> problematic, as it mainly happens in the background so it opens up the door 
> for race conditions on the subscription state. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2166973758

   @chia7712 I noticed that you have reviewed parts of KIP-516 before, and I 
was hoping you have some free time to review this one as well! Thanks in 
advance 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1639052751


##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -34,10 +34,21 @@ public class ApiVersions {
 
 private final Map nodeApiVersions = new 
HashMap<>();
 private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
 
 public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
 this.nodeApiVersions.put(nodeId, nodeApiVersions);
 this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
+}
+
+private short computeMaxProduceSupportedVersion() {
+Optional knownBrokerNodesMinSupportedVersionForProduce = 
this.nodeApiVersions.values().stream()

Review Comment:
   We only check ids for topics included in the batch, and we catch these from 
the metadata which at this point the producer already fetched them or cached 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1639052751


##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -34,10 +34,21 @@ public class ApiVersions {
 
 private final Map nodeApiVersions = new 
HashMap<>();
 private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
 
 public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
 this.nodeApiVersions.put(nodeId, nodeApiVersions);
 this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
+}
+
+private short computeMaxProduceSupportedVersion() {
+Optional knownBrokerNodesMinSupportedVersionForProduce = 
this.nodeApiVersions.values().stream()

Review Comment:
   We only check ids for topic included in the batch, and we catch these from 
the metadata which at this point the producer already fetched them or cached 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ZKConfigs related static method out of core and into ZKConfigs [kafka]

2024-06-13 Thread via GitHub


OmniaGM closed pull request #16109: KAFKA-15853: Move ZKConfigs related static 
method out of core and into ZKConfigs
URL: https://github.com/apache/kafka/pull/16109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1639050807


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##
@@ -50,6 +50,40 @@ public class MirrorCheckpointConnectorTest {
 private static final String CONSUMER_GROUP = "consumer-group-1";
 private static final Map SOURCE_OFFSET = 
MirrorUtils.wrapOffset(0);
 
+@Test
+public void testEmitCheckpointsAndSyncGroupOffsetsBothDisabled() {
+// disable the checkpoint emission
+MirrorCheckpointConfig config = new MirrorCheckpointConfig(
+makeProps("emit.checkpoints.enabled", "false",
+"sync.group.offsets.enabled", "false"));
+
+Set knownConsumerGroups = new HashSet<>();
+knownConsumerGroups.add(CONSUMER_GROUP);
+// MirrorCheckpointConnector as minimum to run taskConfig()
+MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector(knownConsumerGroups,
+config);
+List> output = connector.taskConfigs(1);
+// expect no task will be created
+assertEquals(0, output.size(), "MirrorCheckpointConnector not 
disabled");
+}
+
+@Test
+public void testEmitOffsetSyncsDisabled() {
+// disable the checkpoint emission
+MirrorCheckpointConfig config = new MirrorCheckpointConfig(
+makeProps("emit.checkpoints.enabled", "false",
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, 
"false"));
+
+Set knownConsumerGroups = new HashSet<>();
+knownConsumerGroups.add(CONSUMER_GROUP);
+// MirrorCheckpointConnector as minimum to run taskConfig()
+MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector(knownConsumerGroups,
+config);
+List> output = connector.taskConfigs(1);
+// expect no task will be created
+assertEquals(0, output.size(), "MirrorCheckpointConnector not 
disabled");

Review Comment:
   extracted this out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1639046733


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -437,6 +437,55 @@ public void testSendSyncEvent() {
 verify(producer, times(5)).send(any(), any());
 }
 
+@Test
+public void testDisableEmitOffsetSync() {
+byte[] recordKey = "key".getBytes();
+byte[] recordValue = "value".getBytes();
+int maxOffsetLag = 50;
+int recordPartition = 0;
+int recordOffset = 0;
+int metadataOffset = 100;
+String topicName = "topic";
+String sourceClusterName = "sourceCluster";
+
+RecordHeaders headers = new RecordHeaders();
+ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+@SuppressWarnings("unchecked")
+KafkaConsumer consumer = mock(KafkaConsumer.class);
+@SuppressWarnings("unchecked")
+KafkaProducer producer = mock(KafkaProducer.class);
+MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+Semaphore outstandingOffsetSyncs = new Semaphore(1);
+PartitionState partitionState = new PartitionState(maxOffsetLag);
+Map partitionStates = new HashMap<>();
+
+MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+replicationPolicy, maxOffsetLag, producer, 
outstandingOffsetSyncs, partitionStates, topicName, false);
+
+SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+partitionStates.put(sourceTopicPartition, partitionState);
+RecordMetadata recordMetadata = new 
RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+ArgumentCaptor producerCallback = 
ArgumentCaptor.forClass(Callback.class);
+when(producer.send(any(), 
producerCallback.capture())).thenAnswer(mockInvocation -> {
+producerCallback.getValue().onCompletion(null, null);
+return null;
+});
+
+mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);

Review Comment:
   If am not sending `OffsetSyncWriter offsetSyncWriter` as null then how would 
I know If it wasn't invoked?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -437,6 +437,55 @@ public void testSendSyncEvent() {
 verify(producer, times(5)).send(any(), any());
 }
 
+@Test
+public void testDisableEmitOffsetSync() {
+byte[] recordKey = "key".getBytes();
+byte[] recordValue = "value".getBytes();
+int maxOffsetLag = 50;
+int recordPartition = 0;
+int recordOffset = 0;
+int metadataOffset = 100;
+String topicName = "topic";
+String sourceClusterName = "sourceCluster";
+
+RecordHeaders headers = new RecordHeaders();
+ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+@SuppressWarnings("unchecked")
+KafkaConsumer consumer = mock(KafkaConsumer.class);
+@SuppressWarnings("unchecked")
+KafkaProducer producer = mock(KafkaProducer.class);
+MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+Semaphore outstandingOffsetSyncs = new Semaphore(1);
+PartitionState partitionState = new PartitionState(maxOffsetLag);
+Map partitionStates = new HashMap<>();
+
+MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+replicationPolicy, maxOffsetLag, producer, 
outstandingOffsetSyncs, partitionStates, topicName, false);
+
+SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+partitionStates.put(sourceTopicPartition, partitionState);
+RecordMetadata recordMetadata = new 
RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+ArgumentCaptor producerCallback = 
ArgumentCaptor.forClass(Callback.class);
+when(producer.send(any(), 
producerCallback.capture())).thenAnswer(mockInvocation -> {
+producerCallback.getValue().onCompletion(null, null);
+return null;
+});
+

Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on PR #16241:
URL: https://github.com/apache/kafka/pull/16241#issuecomment-2166953380

   Closing this PR as we have achieved consensus around the simpler solution in 
#16310.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue closed pull request #16241: KAFKA-16637: AsyncKafkaConsumer removes 
offset fetch responses from cache too aggressively
URL: https://github.com/apache/kafka/pull/16241


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access

2024-06-13 Thread Rohan Desai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohan Desai reassigned KAFKA-16955:
---

Assignee: Rohan Desai

> ConcurrentModification exception thrown by KafkaStream threadState access
> -
>
> Key: KAFKA-16955
> URL: https://issues.apache.org/jira/browse/KAFKA-16955
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
>
> We see occasional ConcurrentModificationExceptions thrown when accessing 
> threadState:
>  
>  
> {code:java}
> 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] 
> ResponsiveKafkaStreams - stream-client 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams 
> uncaught exception handler155.745service_application1 
> infoorg.apache.kafka.streams.errors.StreamsException: 
> java.util.ConcurrentModificationException155.745service_application1 info 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
>  [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: 
> java.util.ConcurrentModificationException155.745service_application1 info at 
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) 
> ~[?:?]155.745service_application1 infoat 
> java.util.HashMap$ValueIterator.next(HashMap.java:1633) 
> ~[?:?]155.745service_application1 info   at 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 
> more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16955: fix synchronization of streams threadState [kafka]

2024-06-13 Thread via GitHub


rodesai opened a new pull request, #16337:
URL: https://github.com/apache/kafka/pull/16337

   Each KafkaStreams instance maintains a map from threadId to state to use to 
aggregate to a KafkaStreams app state. The map is updated on every state 
change, and when a new thread is created. State change updates are done in a 
synchronized blocks, however the update that happens on thread creation is not, 
which can raise ConcurrentModificationException. This patch moves this update 
into the listener object and protects it using the object's lock. It also moves 
ownership of the state map into the listener so that its less likely that 
future changes access it without locking


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access

2024-06-13 Thread Rohan Desai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohan Desai updated KAFKA-16955:

Affects Version/s: 3.7.0

> ConcurrentModification exception thrown by KafkaStream threadState access
> -
>
> Key: KAFKA-16955
> URL: https://issues.apache.org/jira/browse/KAFKA-16955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Rohan Desai
>Priority: Major
>
> We see occasional ConcurrentModificationExceptions thrown when accessing 
> threadState:
>  
>  
> {code:java}
> 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] 
> ResponsiveKafkaStreams - stream-client 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams 
> uncaught exception handler155.745service_application1 
> infoorg.apache.kafka.streams.errors.StreamsException: 
> java.util.ConcurrentModificationException155.745service_application1 info 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
>  [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: 
> java.util.ConcurrentModificationException155.745service_application1 info at 
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) 
> ~[?:?]155.745service_application1 infoat 
> java.util.HashMap$ValueIterator.next(HashMap.java:1633) 
> ~[?:?]155.745service_application1 info   at 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 
> more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access

2024-06-13 Thread Rohan Desai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohan Desai updated KAFKA-16955:

Component/s: streams

> ConcurrentModification exception thrown by KafkaStream threadState access
> -
>
> Key: KAFKA-16955
> URL: https://issues.apache.org/jira/browse/KAFKA-16955
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Rohan Desai
>Priority: Major
>
> We see occasional ConcurrentModificationExceptions thrown when accessing 
> threadState:
>  
>  
> {code:java}
> 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] 
> ResponsiveKafkaStreams - stream-client 
> [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams 
> uncaught exception handler155.745service_application1 
> infoorg.apache.kafka.streams.errors.StreamsException: 
> java.util.ConcurrentModificationException155.745service_application1 info 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
>  [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: 
> java.util.ConcurrentModificationException155.745service_application1 info at 
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) 
> ~[?:?]155.745service_application1 infoat 
> java.util.HashMap$ValueIterator.next(HashMap.java:1633) 
> ~[?:?]155.745service_application1 info   at 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>  ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 
> more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access

2024-06-13 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-16955:
---

 Summary: ConcurrentModification exception thrown by KafkaStream 
threadState access
 Key: KAFKA-16955
 URL: https://issues.apache.org/jira/browse/KAFKA-16955
 Project: Kafka
  Issue Type: Bug
Reporter: Rohan Desai


We see occasional ConcurrentModificationExceptions thrown when accessing 
threadState:

 

 
{code:java}
155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 
[e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] 
ResponsiveKafkaStreams - stream-client 
[e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams 
uncaught exception handler155.745service_application1 
infoorg.apache.kafka.streams.errors.StreamsException: 
java.util.ConcurrentModificationException155.745service_application1 info   
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
 [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: 
java.util.ConcurrentModificationException155.745service_application1 info at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) 
~[?:?]155.745service_application1 infoat 
java.util.HashMap$ValueIterator.next(HashMap.java:1633) 
~[?:?]155.745service_application1 info   at 
org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 more 
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-06-13 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1639020762


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -252,7 +252,13 @@ private static ConfigDef defineCheckpointConfig(ConfigDef 
baseConfig) {
 ConfigDef.Type.CLASS,
 TOPIC_FILTER_CLASS_DEFAULT,
 ConfigDef.Importance.LOW,
-TOPIC_FILTER_CLASS_DOC);
+TOPIC_FILTER_CLASS_DOC)
+.define(

Review Comment:
   I moved this to `MirrorMakerConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


mjsax commented on PR #16316:
URL: https://github.com/apache/kafka/pull/16316#issuecomment-2166888575

   Merged to `trunk` and cherry-picked to `3.8` and all the way back to `3.3`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1639005027


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -574,6 +581,148 @@ public void testPollLongThrowsException() {
 "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
 }
 
+@Test
+public void testOffsetFetchStoresPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+// timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the event is reused, so first verify that 
another FetchCommittedOffsetsEvent
+// was not enqueued. On this attempt the Future returns successfully, 
clearing the pending fetch.
+event.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+// The first attempt at poll() retrieves data for partition 0 of the 
topic. poll() creates an event,
+// enqueues it, but its Future does not complete within the timeout, 
leaving a pending fetch.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event1 = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the set of partitions is reassigned, 
causing the pending offset to be replaced.
+// Verify that another FetchCommittedOffsetsEvent is enqueued.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
1)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event2 = getLastEnqueuedEvent();
+assertNotEquals(event1, event2);
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the third attempt, the event from attempt 2 is reused, so there 
should not have been another
+// FetchCommittedOffsetsEvent enqueued. The Future is completed to 
make it return successfully in poll().
+// This will finally clear out the pending fetch.
+event2.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseExpiredPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within
+// the timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent

Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1639004424


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -574,6 +575,93 @@ public void testPollLongThrowsException() {
 "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
 }
 
+@Test
+public void testOffsetFetchStoresPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+// timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(1)).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+// For the second attempt, the event is reused, and this time the 
Future returns successfully, clearing
+// the pending fetch. Verify that the number of 
FetchCommittedOffsetsEvent enqueued remains at 1.
+event.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(1)).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+// The first attempt at poll() retrieves data for partition 0 of the 
topic. poll() creates an event,
+// enqueues it, but its Future does not complete within the timeout, 
leaving a pending fetch.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(1)).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event1 = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+// For the second attempt, the set of partitions is reassigned, 
causing the pending offset to be replaced.
+// Verify that the number of FetchCommittedOffsetsEvent enqueued is 
updated to 2.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
1)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(2)).add(any(FetchCommittedOffsetsEvent.class));

Review Comment:
   I went ahead and used `clearInvocations()` here too, just for consistency. 
Is that OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1639004097


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -574,6 +575,93 @@ public void testPollLongThrowsException() {
 "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
 }
 
+@Test
+public void testOffsetFetchStoresPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+// timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(1)).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+// For the second attempt, the event is reused, and this time the 
Future returns successfully, clearing
+// the pending fetch. Verify that the number of 
FetchCommittedOffsetsEvent enqueued remains at 1.
+event.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
times(1)).add(any(FetchCommittedOffsetsEvent.class));

Review Comment:
   I switched to using `clearInvocations()` and `never()` for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


mjsax merged PR #16316:
URL: https://github.com/apache/kafka/pull/16316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1638978091


##
docs/ops.html:
##
@@ -3100,7 +3100,8 @@ Processor Node 
Metrics
  The following metrics are only available on certain types of nodes, i.e., the 
process-* metrics are only available for
- source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes, and the
+ source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes,
+ emit-final-* metrics are only available for windowed aggregations nodes, and 
the
  record-e2e-latency-* metrics are only available for source processor nodes 
and terminal nodes (nodes without successor
  nodes).
  All of the metrics have a recording level of debug, except for 
the record-e2e-latency-* metrics which have

Review Comment:
   ```suggestion
All of the metrics have a recording level of debug, except for 
the record-e2e-latency-* metrics which have
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


mjsax commented on PR #16313:
URL: https://github.com/apache/kafka/pull/16313#issuecomment-2166877068

   Merged to `trunk` and cherry-picked to `3.8` and all the way back to `3.2`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on PR #16310:
URL: https://github.com/apache/kafka/pull/16310#issuecomment-2166875819

   @AndrewJSchofield @cadonna @lianetm @philipnee: this PR is ready to be 
re-reviewed. Thanks all for your input 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1638998663


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -574,6 +581,148 @@ public void testPollLongThrowsException() {
 "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
 }
 
+@Test
+public void testOffsetFetchStoresPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+// timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the event is reused, so first verify that 
another FetchCommittedOffsetsEvent
+// was not enqueued. On this attempt the Future returns successfully, 
clearing the pending fetch.
+event.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+// The first attempt at poll() retrieves data for partition 0 of the 
topic. poll() creates an event,
+// enqueues it, but its Future does not complete within the timeout, 
leaving a pending fetch.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event1 = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the set of partitions is reassigned, 
causing the pending offset to be replaced.
+// Verify that another FetchCommittedOffsetsEvent is enqueued.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
1)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event2 = getLastEnqueuedEvent();
+assertNotEquals(event1, event2);
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the third attempt, the event from attempt 2 is reused, so there 
should not have been another
+// FetchCommittedOffsetsEvent enqueued. The Future is completed to 
make it return successfully in poll().
+// This will finally clear out the pending fetch.
+event2.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseExpiredPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within
+// the timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent

Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1638996787


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1666,13 +1668,28 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 return true;
 
 log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+
+// The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+// this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+// (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+// case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+// Give the event a reasonable amount of time to complete.
+long timeoutMs = Math.max(defaultApiTimeoutMs, 
timer.remainingMs());
+long deadlineMs = time.milliseconds() + timeoutMs;
+pendingOffsetFetchEvent = new 
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+applicationEventHandler.add(pendingOffsetFetchEvent);
+}
+
+final CompletableFuture> future 
= pendingOffsetFetchEvent.future();
+
 try {
-final FetchCommittedOffsetsEvent event =
-new FetchCommittedOffsetsEvent(
-initializingPartitions,
-calculateDeadlineMs(timer));
-wakeupTrigger.setActiveTask(event.future());
-final Map offsets = 
applicationEventHandler.addAndGet(event);
+wakeupTrigger.setActiveTask(future);
+final Map offsets = 
ConsumerUtils.getResult(future, timer);

Review Comment:
   _HARM_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.4 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16336:
URL: https://github.com/apache/kafka/pull/16336#discussion_r1638992064


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,35 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.4.0
+
+  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770
 deprecates
+  config cache.max.bytes.buffering in favor of the newly 
introduced config statestore.cache.max.bytes.
+  To improve monitoring, two new metrics 
input-buffer-bytes-total and cache-size-bytes-total
+  were added at the DEBUG level. Note, that the KIP is only partially 
implement in 3.4.0 release and config
+  input.buffer.max.bytes is not available yet.
+
+
+
+  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873
 enables you to multicast
+  result records to multiple partition of downstream sink topics and adds 
functionality for choosing to drop result records without sending.
+  The Integer StreamPartitioner.partition() method is 
deprecated and replaced by the newly added
+  
Optiona≶Set>StreamPartitioner.partitions() 
method, that allows to return a set of partitions to send the record to.

Review Comment:
   ```suggestion
 
Optiona≶Set>StreamPartitioner.partitions() 
method, which enables returning a set of partitions to send the record to.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


mjsax merged PR #16313:
URL: https://github.com/apache/kafka/pull/16313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.4 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16336:
URL: https://github.com/apache/kafka/pull/16336#discussion_r1638991305


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,35 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.4.0
+
+  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770
 deprecates
+  config cache.max.bytes.buffering in favor of the newly 
introduced config statestore.cache.max.bytes.
+  To improve monitoring, two new metrics 
input-buffer-bytes-total and cache-size-bytes-total
+  were added at the DEBUG level. Note, that the KIP is only partially 
implement in 3.4.0 release and config

Review Comment:
   ```suggestion
 were added at the DEBUG level. Note, that the KIP is only partially 
implemented in the 3.4.0 release, and config
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: update Kafka Streams docs with 3.4 KIP information [kafka]

2024-06-13 Thread via GitHub


mjsax opened a new pull request, #16336:
URL: https://github.com/apache/kafka/pull/16336

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-13 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1638982381


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -574,6 +581,148 @@ public void testPollLongThrowsException() {
 "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
 }
 
+@Test
+public void testOffsetFetchStoresPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+// timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the event is reused, so first verify that 
another FetchCommittedOffsetsEvent
+// was not enqueued. On this attempt the Future returns successfully, 
clearing the pending fetch.
+event.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+// The first attempt at poll() retrieves data for partition 0 of the 
topic. poll() creates an event,
+// enqueues it, but its Future does not complete within the timeout, 
leaving a pending fetch.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event1 = getLastEnqueuedEvent();
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the second attempt, the set of partitions is reassigned, 
causing the pending offset to be replaced.
+// Verify that another FetchCommittedOffsetsEvent is enqueued.
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
1)));
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent> 
event2 = getLastEnqueuedEvent();
+assertNotEquals(event1, event2);
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs)));
+assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+clearInvocations(applicationEventHandler);
+
+// For the third attempt, the event from attempt 2 is reused, so there 
should not have been another
+// FetchCommittedOffsetsEvent enqueued. The Future is completed to 
make it return successfully in poll().
+// This will finally clear out the pending fetch.
+event2.future().complete(Collections.emptyMap());
+consumer.poll(Duration.ofMillis(timeoutMs));
+verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), 
time.timer(timeoutMs)));
+assertFalse(consumer.hasPendingOffsetFetchEvent());
+}
+
+@Test
+public void testOffsetFetchDoesNotReuseExpiredPendingEvent() {
+consumer = newConsumer();
+long timeoutMs = 0;
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+// The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within
+// the timeout, leaving a pending fetch.
+consumer.poll(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+CompletableApplicationEvent

Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638981849


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.
+Note, that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.
+
+
+
+For multi-AZ deployments, it is desired to assign StandbyTasks to a 
KafkaStreams instance running in a different
+AZ than the corresponding active StreamTask.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708
+allows to configure Kafka Streams instances with a rack-aware 
StandbyTask assignment strategy, by using the new added configs
+rack.aware.assignment.tags and corresponding 
client.tag..
+
+
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791
+adds a new method Optional 
StateStoreContext.recordMetadata() to expose
+record metadata. This helps for example to provide read-your-writes 
consistency guarantees in interactive queries.
+
+
+
+Interactive
 Queries allow users to
+tap into the operational state of Kafka Streams processor nodes. The 
existing API is tightly coupled with the
+actual state store interfaces and thus the internal implementation of 
state store. To break up this tight coupling
+and allow for building more advanced IQ features,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796
 introduces
+a completely new IQv2 API, via StateQueryRequest and 
StateQueryResult classes,
+as well as Query and QueryResult interfaces 
(plus additional helper classes).
+
+In addition, multiple built-in query types were added: 
KeyQuery for key lookups and
+RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805)
+for key-range queries on key-value stores, as well as 
WindowKeyQuery and WindowRangeQuery
+(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806)
+for key and range lookup into windowed stores.
+
+
+
+The Kafka Streams DSL may insert so-called repartition topic for 
certain DSL operators to ensure correct partitioning
+of data. These topics are configured with infinite retention time and 
Kafka Streams purges old data explicitly
+via "delete record" requests, when commiting input topic offsets.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams";>KIP-811
+adds a new config repartition.purge.interval.ms allowing 
to configure the purge interval independently of the commit interval.

Review Comment:
   ```suggestion
   adds a new config repartition.purge.interval.ms 
allowing you to configure the purge interval independently of the commit 
interval.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638981412


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.
+Note, that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.
+
+
+
+For multi-AZ deployments, it is desired to assign StandbyTasks to a 
KafkaStreams instance running in a different
+AZ than the corresponding active StreamTask.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708
+allows to configure Kafka Streams instances with a rack-aware 
StandbyTask assignment strategy, by using the new added configs
+rack.aware.assignment.tags and corresponding 
client.tag..
+
+
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791
+adds a new method Optional 
StateStoreContext.recordMetadata() to expose
+record metadata. This helps for example to provide read-your-writes 
consistency guarantees in interactive queries.
+
+
+
+Interactive
 Queries allow users to
+tap into the operational state of Kafka Streams processor nodes. The 
existing API is tightly coupled with the
+actual state store interfaces and thus the internal implementation of 
state store. To break up this tight coupling
+and allow for building more advanced IQ features,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796
 introduces
+a completely new IQv2 API, via StateQueryRequest and 
StateQueryResult classes,
+as well as Query and QueryResult interfaces 
(plus additional helper classes).
+
+In addition, multiple built-in query types were added: 
KeyQuery for key lookups and
+RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805)
+for key-range queries on key-value stores, as well as 
WindowKeyQuery and WindowRangeQuery
+(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806)
+for key and range lookup into windowed stores.
+
+
+
+The Kafka Streams DSL may insert so-called repartition topic for 
certain DSL operators to ensure correct partitioning
+of data. These topics are configured with infinite retention time and 
Kafka Streams purges old data explicitly

Review Comment:
   ```suggestion
   of data. These topics are configured with infinite retention time. 
and Kafka Streams purges old data explicitly
   ```



##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implement

Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638981274


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.
+Note, that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.
+
+
+
+For multi-AZ deployments, it is desired to assign StandbyTasks to a 
KafkaStreams instance running in a different
+AZ than the corresponding active StreamTask.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708
+allows to configure Kafka Streams instances with a rack-aware 
StandbyTask assignment strategy, by using the new added configs
+rack.aware.assignment.tags and corresponding 
client.tag..
+
+
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791
+adds a new method Optional 
StateStoreContext.recordMetadata() to expose
+record metadata. This helps for example to provide read-your-writes 
consistency guarantees in interactive queries.
+
+
+
+Interactive
 Queries allow users to
+tap into the operational state of Kafka Streams processor nodes. The 
existing API is tightly coupled with the
+actual state store interfaces and thus the internal implementation of 
state store. To break up this tight coupling
+and allow for building more advanced IQ features,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796
 introduces
+a completely new IQv2 API, via StateQueryRequest and 
StateQueryResult classes,
+as well as Query and QueryResult interfaces 
(plus additional helper classes).
+
+In addition, multiple built-in query types were added: 
KeyQuery for key lookups and
+RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805)
+for key-range queries on key-value stores, as well as 
WindowKeyQuery and WindowRangeQuery
+(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806)
+for key and range lookup into windowed stores.
+
+
+
+The Kafka Streams DSL may insert so-called repartition topic for 
certain DSL operators to ensure correct partitioning

Review Comment:
   ```suggestion
   The Kafka Streams DSL may insert so-called repartition topics for 
certain DSL operators to ensure correct partitioning
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638980584


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.
+Note, that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.
+
+
+
+For multi-AZ deployments, it is desired to assign StandbyTasks to a 
KafkaStreams instance running in a different
+AZ than the corresponding active StreamTask.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708
+allows to configure Kafka Streams instances with a rack-aware 
StandbyTask assignment strategy, by using the new added configs

Review Comment:
   ```suggestion
   enables configuring Kafka Streams instances with a rack-aware 
StandbyTask assignment strategy, by using the new added configs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638980302


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.
+Note, that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.

Review Comment:
   ```suggestion
   Note that it is required to pass TopologyConfig to the 
StreamsBuilder constructor to make use of this new config.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638980033


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.
+However, for the DSL, the choice is a per-operator one, making it 
cumbersome to switch from the default RocksDB
+store to in-memory store for all operators, especially for larger 
topologies.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591
+adds a new config default.dsl.store that allows to set 
the default store for all DSL operators globally.

Review Comment:
   ```suggestion
   adds a new config default.dsl.store that enables 
setting the default store for all DSL operators globally.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16313:
URL: https://github.com/apache/kafka/pull/16313#discussion_r1638979649


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,61 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.2.0
+
+RocksDB offers many metrics which are critical to monitor and tune its 
performance. Kafka Streams started to make RocksDB metrics accessible
+like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471
 in 2.4.0 release.
+However, the KIP was only partially implemented, and is now completed 
with the 3.2.0 release.
+For a full list of available RocksDB metrics, please consult the monitoring 
documentation.
+
+
+
+Kafka Streams ships with RocksDB and in-memory store implementations 
and user can pick which one to use.

Review Comment:
   ```suggestion
   Kafka Streams ships with RocksDB and in-memory store implementations 
and users can pick which one to use.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1638978368


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,62 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.3.0
+
+  Kafka Streams does not send a "leave group" request when an instance is 
closed. This behavior implies
+  that a rebalance is delayed until max.poll.interval.ms passed.

Review Comment:
   ```suggestion
 that a rebalance is delayed until max.poll.interval.ms 
passed.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1638978091


##
docs/ops.html:
##
@@ -3100,7 +3100,8 @@ Processor Node 
Metrics
  The following metrics are only available on certain types of nodes, i.e., the 
process-* metrics are only available for
- source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes, and the
+ source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes,
+ emit-final-* metrics are only available for windowed aggregations nodes, and 
the
  record-e2e-latency-* metrics are only available for source processor nodes 
and terminal nodes (nodes without successor
  nodes).
  All of the metrics have a recording level of debug, except for 
the record-e2e-latency-* metrics which have

Review Comment:
   ```suggestion
All of the metrics have a recording level of debug, except for 
the record-e2e-latency-* metrics which have
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1638977635


##
docs/ops.html:
##
@@ -3100,7 +3100,8 @@ Processor Node 
Metrics
  The following metrics are only available on certain types of nodes, i.e., the 
process-* metrics are only available for
- source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes, and the
+ source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes,
+ emit-final-* metrics are only available for windowed aggregations nodes, and 
the
  record-e2e-latency-* metrics are only available for source processor nodes 
and terminal nodes (nodes without successor

Review Comment:
   ```suggestion
record-e2e-latency-* metrics are only available for source 
processor nodes and terminal nodes (nodes without successor
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-13 Thread via GitHub


JimGalasyn commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1638977463


##
docs/ops.html:
##
@@ -3100,7 +3100,8 @@ Processor Node 
Metrics
  The following metrics are only available on certain types of nodes, i.e., the 
process-* metrics are only available for
- source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes, and the
+ source processor nodes, the suppression-emit-* metrics are only available for 
suppression operation nodes,
+ emit-final-* metrics are only available for windowed aggregations nodes, and 
the

Review Comment:
   ```suggestion
emit-final-* metrics are only available for windowed 
aggregations nodes, and the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >